You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu> on 2021/07/26 00:43:15 UTC

Change in asterixdb[master]: WIP: Failover with -1

From Murtadha Hubail <mh...@apache.org>:

Murtadha Hubail has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12503 )


Change subject: WIP: Failover with -1
......................................................................

WIP: Failover with -1

Change-Id: I3b8de5d3354be36768e3c9da116c3c96a552a867
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllNonMetadataDatasetsReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
A asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
41 files changed, 421 insertions(+), 95 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/03/12503/1

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
index d7d5994..8fd16ff 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
@@ -135,6 +135,7 @@
                 final ObjectNode replicaJson = OBJECT_MAPPER.createObjectNode();
                 replicaJson.put("location", toHostPort(replica.getIdentifier().getLocation()));
                 replicaJson.put("status", replica.getStatus().toString());
+                replicaJson.put("nodeId", replica.getIdentifier().getNodeId());
                 replicasArray.add(replicaJson);
             }
             partitionJson.set("replicas", replicasArray);
@@ -167,11 +168,12 @@
         final String partition = request.getParameter("partition");
         final String host = request.getParameter("host");
         final String port = request.getParameter("port");
+        final String nodeId = request.getParameter("nodeId");
         if (partition == null || host == null || port == null) {
             return null;
         }
-        final InetSocketAddress replicaAddress = new InetSocketAddress(host, Integer.valueOf(port));
-        return ReplicaIdentifier.of(Integer.valueOf(partition), replicaAddress);
+        final InetSocketAddress replicaAddress = new InetSocketAddress(host, Integer.parseInt(port));
+        return ReplicaIdentifier.of(Integer.parseInt(partition), nodeId, replicaAddress);
     }
 
     private void processPromote(IServletRequest request, IServletResponse response) throws HyracksDataException {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 9aa433f..53d0973 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -38,7 +38,6 @@
 import org.apache.asterix.common.api.IPropertiesFactory;
 import org.apache.asterix.common.api.IReceptionist;
 import org.apache.asterix.common.api.IReceptionistFactory;
-import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.ActiveProperties;
 import org.apache.asterix.common.config.BuildProperties;
 import org.apache.asterix.common.config.CompilerProperties;
@@ -225,9 +224,9 @@
                 new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
                         virtualBufferCache, indexCheckpointManagerProvider, ioManager.getIODevices().size());
         final String nodeId = getServiceContext().getNodeId();
-        final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
+        final String[] nodePartitions = nodeProperties.getActivePartitions();
         final Set<Integer> nodePartitionsIds =
-                Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet());
+                Arrays.stream(nodePartitions).map(Integer::parseInt).collect(Collectors.toSet());
         replicaManager = new ReplicaManager(this, nodePartitionsIds);
         isShuttingdown = false;
         activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 0359cf1..a0adde1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -160,7 +160,7 @@
     @Override
     public void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException {
         state = SystemState.RECOVERING;
-        LOGGER.info("starting recovery ...");
+        LOGGER.info("starting recovery for partitions {}", partitions);
 
         long readableSmallestLSN = logMgr.getReadableSmallestLSN();
         Checkpoint checkpointObject = checkpointManager.getLatest();
@@ -362,19 +362,17 @@
                                 index = (ILSMIndex) localResourceMetadata.createInstance(serviceCtx);
                                 datasetLifecycleManager.register(localResource.getPath(), index);
                                 datasetLifecycleManager.open(localResource.getPath());
-                                try {
-                                    final DatasetResourceReference resourceReference =
-                                            DatasetResourceReference.of(localResource);
-                                    maxDiskLastLsn =
-                                            indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
-                                } catch (HyracksDataException e) {
-                                    datasetLifecycleManager.close(localResource.getPath());
-                                    throw e;
-                                }
+                                maxDiskLastLsn = getResourceLowWaterMark(localResource, datasetLifecycleManager,
+                                        indexCheckpointManagerProvider);
                                 //#. set resourceId and maxDiskLastLSN to the map
                                 resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
                             } else {
-                                maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
+                                if (!resourceId2MaxLSNMap.containsKey(resourceId)) {
+                                    maxDiskLastLsn = getResourceLowWaterMark(localResource, datasetLifecycleManager,
+                                            indexCheckpointManagerProvider);
+                                } else {
+                                    maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
+                                }
                             }
                             // lsn @ maxDiskLastLsn is either a flush log or a master replica log
                             if (lsn >= maxDiskLastLsn) {
@@ -450,6 +448,19 @@
         }
     }
 
+    private long getResourceLowWaterMark(LocalResource localResource, IDatasetLifecycleManager datasetLifecycleManager,
+            IIndexCheckpointManagerProvider indexCheckpointManagerProvider) throws HyracksDataException {
+        long maxDiskLastLsn;
+        try {
+            final DatasetResourceReference resourceReference = DatasetResourceReference.of(localResource);
+            maxDiskLastLsn = indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
+        } catch (HyracksDataException e) {
+            datasetLifecycleManager.close(localResource.getPath());
+            throw e;
+        }
+        return maxDiskLastLsn;
+    }
+
     private boolean needToFreeMemory() {
         return Runtime.getRuntime().freeMemory() < cachedEntityCommitsPerJobSize;
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index ad70cf4..546af88 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -30,11 +30,14 @@
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.storage.ReplicaIdentifier;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.asterix.replication.messaging.ReplicaPromotionMessage;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.config.IApplicationConfig;
@@ -68,17 +71,14 @@
 
     @Override
     public synchronized void addReplica(ReplicaIdentifier id) {
-        final NodeControllerService controllerService =
-                (NodeControllerService) appCtx.getServiceContext().getControllerService();
-        final NodeStatus nodeStatus = controllerService.getNodeStatus();
-        if (nodeStatus != NodeStatus.ACTIVE) {
-            LOGGER.warn("Ignoring request to add replica. Node is not ACTIVE yet. Current status: {}", nodeStatus);
+        if (nodeNotReady("addReplica")) {
             return;
         }
-        if (!partitions.contains(id.getPartition())) {
-            throw new IllegalStateException(
-                    "This node is not the current master of partition(" + id.getPartition() + ")");
-        }
+        //FIXIT return this condition when -1 is added to the set of active partitions
+        //        if (!partitions.contains(id.getPartition())) {
+        //            throw new IllegalStateException(
+        //                    "This node is not the current master of partition(" + id.getPartition() + ")");
+        //        }
         if (isSelf(id)) {
             LOGGER.info("ignoring request to add replica to ourselves");
             return;
@@ -89,8 +89,13 @@
 
     @Override
     public synchronized void removeReplica(ReplicaIdentifier id) {
+        if (nodeNotReady("removeReplica")) {
+            return;
+        }
+        //TODO return illegal state?
         if (!replicas.containsKey(id)) {
-            throw new IllegalStateException("replica with id(" + id + ") does not exist");
+            return;
+            //            throw new IllegalStateException("replica with id(" + id + ") does not exist");
         }
         PartitionReplica replica = replicas.remove(id);
         appCtx.getReplicationManager().unregister(replica);
@@ -110,6 +115,9 @@
 
     @Override
     public synchronized void promote(int partition) throws HyracksDataException {
+        if (nodeNotReady("promote")) {
+            return;
+        }
         if (partitions.contains(partition)) {
             return;
         }
@@ -119,15 +127,30 @@
         final IRecoveryManager recoveryManager = appCtx.getTransactionSubsystem().getRecoveryManager();
         recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true);
         partitions.add(partition);
+        // TODO should we just persist the takeover success on metakv to prevent and race between CC being active
+        // and the partition being active?
+        ReplicaPromotionMessage message =
+                new ReplicaPromotionMessage(appCtx.getTransactionSubsystem().getId(), partition);
+        INCMessageBroker messageBroker = (INCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+        try {
+            messageBroker.sendMessageToPrimaryCC(message);
+        } catch (Exception e) {
+            //TODO halt
+            LOGGER.error("failed to notify CC of replica promotion");
+        }
     }
 
     @Override
     public synchronized void release(int partition) throws HyracksDataException {
+        if (nodeNotReady("release")) {
+            throw new IllegalStateException("received release request while node is not ready");
+        }
         if (!partitions.contains(partition)) {
             return;
         }
         final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
-        datasetLifecycleManager.flushDataset(appCtx.getReplicationManager().getReplicationStrategy());
+        IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
+        datasetLifecycleManager.flushDataset(replicationStrategy);
         closePartitionResources(partition);
         final List<IPartitionReplica> partitionReplicas = getReplicas(partition);
         for (IPartitionReplica replica : partitionReplicas) {
@@ -141,14 +164,15 @@
         return replicaSyncLock;
     }
 
-    private void closePartitionResources(int partition) throws HyracksDataException {
+    public void closePartitionResources(int partition) throws HyracksDataException {
         final PersistentLocalResourceRepository resourceRepository =
                 (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
         final Map<Long, LocalResource> partitionResources = resourceRepository.getPartitionResources(partition);
         final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
         for (LocalResource resource : partitionResources.values()) {
-            datasetLifecycleManager.close(resource.getPath());
+            datasetLifecycleManager.closeIfOpen(resource.getPath());
         }
+        datasetLifecycleManager.closePartition(partition);
     }
 
     private boolean isSelf(ReplicaIdentifier id) {
@@ -157,6 +181,17 @@
         int port = appConfig.getInt(NCConfig.Option.REPLICATION_LISTEN_PORT);
 
         final InetSocketAddress replicaAddress = new InetSocketAddress(host, port);
-        return id.equals(ReplicaIdentifier.of(id.getPartition(), replicaAddress));
+        return id.equals(ReplicaIdentifier.of(id.getPartition(), id.getNodeId(), replicaAddress));
+    }
+
+    private boolean nodeNotReady(String request) {
+        final NodeControllerService controllerService =
+                (NodeControllerService) appCtx.getServiceContext().getControllerService();
+        final NodeStatus nodeStatus = controllerService.getNodeStatus();
+        if (nodeStatus != NodeStatus.ACTIVE) {
+            LOGGER.warn("Ignoring request {}. Node is not ACTIVE yet. Current status: {}", request, nodeStatus);
+            return true;
+        }
+        return false;
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
index f58f871..f19d896 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
@@ -38,6 +38,7 @@
     public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
         INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
         try {
+            //TODO do we need to promote here?
             appContext.getReplicaManager().promote(partitionId);
             SystemState state = appContext.getTransactionSubsystem().getRecoveryManager().getSystemState();
             appContext.initializeMetadata(state == SystemState.PERMANENT_DATA_LOSS, partitionId);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index c4e4f82..cacb15a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -26,14 +26,12 @@
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
 import org.apache.asterix.app.nc.task.CheckpointTask;
@@ -51,7 +49,6 @@
 import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -102,7 +99,8 @@
     @Override
     public void notifyNodeFailure(String nodeId, InetSocketAddress replicaAddress) throws HyracksDataException {
         pendingStartupCompletionNodes.remove(nodeId);
-        clusterManager.updateNodeState(nodeId, false, null);
+        //TODO on node failure, we need to set all of it's active partitions to inactive
+        clusterManager.updateNodeState(nodeId, false, null, null);
         if (nodeId.equals(metadataNodeId)) {
             clusterManager.updateMetadataNode(metadataNodeId, false);
         }
@@ -138,7 +136,8 @@
     private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
         final String nodeId = msg.getNodeId();
         nodeSecretsMap.put(nodeId, msg.getSecrets());
-        List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState());
+        List<INCLifecycleTask> tasks =
+                buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState(), msg.getActivePartitions());
         RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
         try {
             messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
@@ -157,7 +156,7 @@
             return;
         }
         if (msg.isSuccess()) {
-            clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters());
+            clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters(), msg.getPartitions());
             if (msg.getNodeId().equals(metadataNodeId)) {
                 clusterManager.updateMetadataNode(metadataNodeId, true);
             }
@@ -167,7 +166,8 @@
         }
     }
 
-    protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state) {
+    protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state,
+            Set<Integer> activePartitions) {
         LOGGER.info("Building registration tasks for node {} with status {} and system state: {}", nodeId, nodeStatus,
                 state);
         final boolean isMetadataNode = nodeId.equals(metadataNodeId);
@@ -175,7 +175,7 @@
             case ACTIVE:
                 return buildActiveNCRegTasks(isMetadataNode);
             case IDLE:
-                return buildIdleNcRegTasks(nodeId, isMetadataNode, state);
+                return buildIdleNcRegTasks(nodeId, isMetadataNode, state, activePartitions);
             default:
                 return new ArrayList<>();
         }
@@ -210,15 +210,18 @@
         }
     }
 
-    protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state) {
+    protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state,
+            Set<Integer> activePartitions) {
         final List<INCLifecycleTask> tasks = new ArrayList<>();
         tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING));
         if (state == SystemState.CORRUPTED) {
-            // need to perform local recovery for node partitions
-            LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.stream(clusterManager.getNodePartitions(newNodeId))
-                    .map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
+            // need to perform local recovery for node active partitions
+            //TODO ensure recovery is done for all active partitions before the CheckpointTask since that
+            // might delete any old transaction log files
+            LocalRecoveryTask rt = new LocalRecoveryTask(activePartitions);
             tasks.add(rt);
         }
+
         if (replicationEnabled) {
             tasks.add(new StartReplicationServiceTask());
         }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
index 79fa7c8..c70b6bc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.app.replication.message;
 
+import java.util.Set;
+
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
@@ -31,11 +33,14 @@
     private final boolean success;
     private Throwable exception;
     private final NcLocalCounters localCounters;
+    private Set<Integer> partitions;
 
-    public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters) {
+    public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters,
+            Set<Integer> partitions) {
         this.nodeId = nodeId;
         this.success = success;
         this.localCounters = localCounters;
+        this.partitions = partitions;
     }
 
     @Override
@@ -63,6 +68,10 @@
         return localCounters;
     }
 
+    public Set<Integer> getPartitions() {
+        return partitions;
+    }
+
     @Override
     public MessageType getType() {
         return MessageType.REGISTRATION_TASKS_RESULT;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
index c2cc63c..870413b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
@@ -20,6 +20,7 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
@@ -37,25 +38,27 @@
 public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICcAddressedMessage {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private static final long serialVersionUID = 2L;
+    private static final long serialVersionUID = 3L;
     protected final SystemState state;
     protected final String nodeId;
     protected final NodeStatus nodeStatus;
     protected final Map<String, Object> secrets;
+    protected final Set<Integer> activePartitions;
 
     public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state,
-            Map<String, Object> secretsEphemeral) {
+            Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) {
         this.state = state;
         this.nodeId = nodeId;
         this.nodeStatus = nodeStatus;
         this.secrets = new HashMap<>(secretsEphemeral);
+        this.activePartitions = activePartitions;
     }
 
     public static void send(CcId ccId, NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState,
-            Map<String, Object> secretsEphemeral) throws HyracksDataException {
+            Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) throws HyracksDataException {
         try {
-            RegistrationTasksRequestMessage msg =
-                    new RegistrationTasksRequestMessage(cs.getId(), nodeStatus, systemState, secretsEphemeral);
+            RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus,
+                    systemState, secretsEphemeral, activePartitions);
             ((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(ccId, msg);
         } catch (Exception e) {
             LOGGER.log(Level.ERROR, "Unable to send RegistrationTasksRequestMessage to CC", e);
@@ -88,4 +91,8 @@
     public Map<String, Object> getSecrets() {
         return secrets;
     }
+
+    public Set<Integer> getActivePartitions() {
+        return activePartitions;
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index 1227a6f..7848849 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.app.replication.message;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -72,7 +73,9 @@
             }
             NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(),
                     (NodeControllerService) appCtx.getServiceContext().getControllerService()) : null;
-            NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success, localCounter);
+            Set<Integer> partitions = appCtx.getReplicaManager().getPartitions();
+            NCLifecycleTaskReportMessage result =
+                    new NCLifecycleTaskReportMessage(nodeId, success, localCounter, partitions);
             result.setException(exception);
             try {
                 broker.sendMessageToCC(getCcId(), result);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 3571d88..76eb184 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -42,6 +42,7 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.active.ActivityState;
@@ -272,6 +273,7 @@
     protected final ICcApplicationContext appCtx;
     protected final SessionOutput sessionOutput;
     protected final SessionConfig sessionConfig;
+    protected final ReentrantReadWriteLock compilationLock;
     protected Dataverse activeDataverse;
     protected final List<FunctionDecl> declaredFunctions;
     protected final ILangCompilationProvider compilationProvider;
@@ -290,6 +292,7 @@
         this.appCtx = appCtx;
         this.lockManager = appCtx.getMetadataLockManager();
         this.lockUtil = appCtx.getMetadataLockUtil();
+        this.compilationLock = appCtx.getCompilationLock();
         this.statements = statements;
         this.sessionOutput = output;
         this.sessionConfig = output.config();
@@ -3367,12 +3370,14 @@
         final IMetadataLocker locker = new IMetadataLocker() {
             @Override
             public void lock() throws AlgebricksException {
+                compilationLock.readLock().lock();
                 lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
             }
 
             @Override
             public void unlock() {
                 metadataProvider.getLocks().unlock();
+                compilationLock.readLock().unlock();
             }
         };
         final IStatementCompiler compiler = () -> {
@@ -3962,6 +3967,7 @@
         final IMetadataLocker locker = new IMetadataLocker() {
             @Override
             public void lock() {
+                compilationLock.readLock().lock();
             }
 
             @Override
@@ -3969,6 +3975,7 @@
                 metadataProvider.getLocks().unlock();
                 // release external datasets' locks acquired during compilation of the query
                 ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
+                compilationLock.readLock().unlock();
             }
         };
         final IStatementCompiler compiler = () -> {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index c148c92..1f55631 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -36,6 +36,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.api.http.server.BasicAuthServlet;
@@ -299,8 +300,10 @@
         final Map httpSecrets =
                 apiServer != null ? Collections.singletonMap(SYS_AUTH_HEADER, apiServer.ctx().get(SYS_AUTH_HEADER))
                         : Collections.emptyMap();
+        Set<Integer> activePartitions = Arrays.stream(runtimeContext.getNodeProperties().getActivePartitions())
+                .map(Integer::parseInt).collect(Collectors.toSet());
         RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
-                currentStatus, systemState, httpSecrets);
+                currentStatus, systemState, httpSecrets, activePartitions);
     }
 
     @Override
@@ -340,7 +343,11 @@
     @Override
     public IFileDeviceResolver getFileDeviceResolver() {
         return (relPath, devices) -> {
-            int ioDeviceIndex = Math.abs(StoragePathUtil.getPartitionNumFromRelativePath(relPath) % devices.size());
+            int partition = StoragePathUtil.getPartitionNumFromRelativePath(relPath);
+            if (partition < 0) {
+                return devices.get(0);
+            }
+            int ioDeviceIndex = Math.abs(partition % devices.size());
             return devices.get(ioDeviceIndex);
         };
     }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index 9cc295e..370b330 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -200,7 +200,8 @@
 
     private void notifyNodeStartupCompletion(CcApplicationContext applicationContext, String nodeId)
             throws HyracksDataException {
-        NCLifecycleTaskReportMessage msg = new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters());
+        NCLifecycleTaskReportMessage msg =
+                new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters(), Collections.emptySet());
         applicationContext.getNcLifecycleCoordinator().process(msg);
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index b03af55..48bada3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -169,4 +169,8 @@
      * @return the current datasets io stats
      */
     StorageIOStats getDatasetsIOStats();
+
+    void closeIfOpen(String resourcePath) throws HyracksDataException;
+
+    void closePartition(int partitionId);
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
index cc99421..9c66acf 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
@@ -33,6 +33,15 @@
         this.ioDeviceNum = ioDeviceNum;
     }
 
+    public static ClusterPartition of(int partitionId, String nodeId, int ioDeviceNum, String activeNodeId,
+            boolean active, boolean pendingActivation) {
+        ClusterPartition partition = new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+        partition.activeNodeId = activeNodeId;
+        partition.active = active;
+        partition.pendingActivation = pendingActivation;
+        return partition;
+    }
+
     public int getPartitionId() {
         return partitionId;
     }
@@ -71,7 +80,10 @@
 
     @Override
     public ClusterPartition clone() {
-        return new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+        ClusterPartition clone = new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+        clone.setActiveNodeId(activeNodeId);
+        clone.setActive(active);
+        return clone;
     }
 
     @Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index a37e6e4..94f80cc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -62,9 +62,11 @@
      * @param nodeId
      * @param active
      * @param ncLocalCounters
+     * @param partitions
      * @throws HyracksDataException
      */
-    void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters) throws HyracksDataException;
+    void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters, Set<Integer> partitions)
+            throws HyracksDataException;
 
     /**
      * Updates the active node and active state of the cluster partition with id {@code partitionNum}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
index aaf6316..efc9b6a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
@@ -46,7 +46,11 @@
         STARTING_PARTITION_ID(
                 OptionTypes.INTEGER,
                 -1,
-                "The first partition id to assign to iodevices on this node (-1 == auto-assign)");
+                "The first partition id to assign to iodevices on this node (-1 == auto-assign)"),
+        ACTIVE_PARTITIONS(
+                OptionTypes.STRING_ARRAY,
+                new String[] {},
+                "List of partitions this node is currently the master of");
 
         private final IOptionType type;
         private final Object defaultValue;
@@ -95,7 +99,7 @@
 
         @Override
         public boolean hidden() {
-            return this == INITIAL_RUN || this == STARTING_PARTITION_ID;
+            return this == INITIAL_RUN || this == STARTING_PARTITION_ID || this == ACTIVE_PARTITIONS;
         }
 
     }
@@ -115,4 +119,8 @@
     public String getTxnLogDir() {
         return accessor.getString(Option.TXN_LOG_DIR);
     }
+
+    public String[] getActivePartitions() {
+        return accessor.getStringArray(Option.ACTIVE_PARTITIONS);
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
index 3b6100c..5d1a8e4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
@@ -132,6 +132,7 @@
         }
         stores.put(ncId, nodeStores);
         nodePartitionsMap.put(ncId, nodePartitions);
+        LOGGER.info("Node partitions {}", nodeCfg.getStringArray(NodeProperties.Option.ACTIVE_PARTITIONS));
     }
 
     private void loadAsterixBuildProperties() throws AsterixException {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index b2f4034..4b05629 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -113,6 +113,8 @@
 
     @Override
     public synchronized void register(String resourcePath, IIndex index) throws HyracksDataException {
+        LOGGER.info("registering {} with dataset life cycle manager. Thread: {}", index,
+                Thread.currentThread().getStackTrace());
         validateDatasetLifecycleManagerState();
         int did = getDIDfromResourcePath(resourcePath);
         LocalResource resource = resourceRepository.get(resourcePath);
@@ -141,6 +143,8 @@
 
     @Override
     public synchronized void unregister(String resourcePath) throws HyracksDataException {
+        LOGGER.info("unregister {} with dataset life cycle manager. Thread: {}", resourcePath,
+                Thread.currentThread().getStackTrace());
         validateDatasetLifecycleManagerState();
         int did = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
@@ -559,6 +563,51 @@
         return stats;
     }
 
+    @Override
+    public synchronized void closeIfOpen(String resourcePath) throws HyracksDataException {
+        validateDatasetLifecycleManagerState();
+        int did = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
+
+        DatasetResource dsr = datasets.get(did);
+        IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
+
+        if (dsr == null || iInfo == null) {
+            return;
+        }
+
+        PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
+        if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
+            if (LOGGER.isErrorEnabled()) {
+                final String logMsg = String.format(
+                        "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
+                        resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
+                LOGGER.error(logMsg);
+            }
+            throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
+                    StoragePathUtil.getIndexNameFromPath(resourcePath));
+        }
+
+        // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
+        DatasetInfo dsInfo = dsr.getDatasetInfo();
+        dsInfo.waitForIO();
+        closeIndex(iInfo);
+        dsInfo.removeIndex(resourceID);
+        synchronized (dsInfo) {
+            if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
+                    && !dsInfo.isExternal()) {
+                removeDatasetFromCache(dsInfo.getDatasetID());
+            }
+        }
+    }
+
+    @Override
+    public synchronized void closePartition(int partitionId) {
+        for (DatasetResource ds : datasets.values()) {
+            ds.removePartition(partitionId);
+        }
+    }
+
     private void closeIndex(IndexInfo indexInfo) throws HyracksDataException {
         if (indexInfo.isOpen()) {
             ILSMOperationTracker opTracker = indexInfo.getIndex().getOperationTracker();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 54e1976..db9eabb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -182,4 +182,10 @@
     public boolean isMetadataDataset() {
         return MetadataIndexImmutableProperties.isMetadataDataset(getDatasetID());
     }
+
+    public void removePartition(int partitionId) {
+        datasetPrimaryOpTrackers.remove(partitionId);
+        datasetComponentIdGenerators.remove(partitionId);
+        datasetRateLimiters.remove(partitionId);
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
index 7f448e3..249505b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
@@ -41,7 +41,7 @@
     /**
      * The resource partition
      */
-    private final int partition;
+    private int partition;
     private final IResource resource;
 
     public DatasetLocalResource(int datasetId, int partition, IResource resource) {
@@ -68,6 +68,10 @@
         resource.setPath(path);
     }
 
+    public void setPartition(int partition) {
+        this.partition = partition;
+    }
+
     @Override
     public IIndex createInstance(INCServiceContext ncServiceCtx) throws HyracksDataException {
         return resource.createInstance(ncServiceCtx);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index ad90814..72fb32c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.common.dataflow;
 
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.ICoordinationService;
 import org.apache.asterix.common.api.IMetadataLockManager;
@@ -149,4 +151,11 @@
      * @return the adapter factory service
      */
     IAdapterFactoryService getAdapterFactoryService();
+
+    /**
+     * Gets the compilation lock
+     *
+     * @return the compilation lock
+     */
+    ReentrantReadWriteLock getCompilationLock();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
index a8cd14f..46bca34 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
@@ -20,10 +20,12 @@
 
 public class MetadataIndexImmutableProperties {
 
+    public static final int METADATA_PARTITION_ID = -1;
     public static final int FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID = 52;
     public static final int FIRST_AVAILABLE_USER_DATASET_ID = 100;
     public static final int METADATA_DATASETS_PARTITIONS = 1;
     public static final int METADATA_DATASETS_COUNT = 15;
+    //    public static final Map<Integer, Integer>
 
     private final String indexName;
     private final int datasetId;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
index c969777..1dfc2e2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
@@ -24,4 +24,9 @@
     public boolean isMatch(int datasetId) {
         return true;
     }
+
+    @Override
+    public String getName() {
+        return "all";
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllNonMetadataDatasetsReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllNonMetadataDatasetsReplicationStrategy.java
new file mode 100644
index 0000000..7d73e79
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllNonMetadataDatasetsReplicationStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+
+public class AllNonMetadataDatasetsReplicationStrategy implements IReplicationStrategy {
+
+    @Override
+    public boolean isMatch(int datasetId) {
+        return !MetadataIndexImmutableProperties.isMetadataDataset(datasetId);
+    }
+
+    @Override
+    public String getName() {
+        return "non-metadata";
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
index 0ad8be2..340bad4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
@@ -18,11 +18,15 @@
  */
 package org.apache.asterix.common.replication;
 
-public interface IReplicationStrategy {
+import java.io.Serializable;
+
+public interface IReplicationStrategy extends Serializable {
 
     /**
      * @param datasetId
      * @return True, if the dataset should be replicated. Otherwise false.
      */
     boolean isMatch(int datasetId);
+
+    String getName();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
index 2b97fe7..5278208 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
@@ -26,4 +26,9 @@
     public boolean isMatch(int datasetId) {
         return MetadataIndexImmutableProperties.isMetadataDataset(datasetId);
     }
+
+    @Override
+    public String getName() {
+        return "metadata";
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
index 8ffaa15..49083c8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
@@ -24,4 +24,9 @@
     public boolean isMatch(int datasetId) {
         return false;
     }
+
+    @Override
+    public String getName() {
+        return "none";
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
index c4bb74c..95ea055 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
@@ -27,21 +27,29 @@
     private final int partition;
     private final String id;
     private volatile InetSocketAddress location;
+    private final String nodeId;
+    private final int storagePartition;
 
-    private ReplicaIdentifier(int partition, InetSocketAddress location) {
+    private ReplicaIdentifier(int partition, String nodeId, InetSocketAddress location) {
         this.partition = partition;
         this.location = location;
+        this.nodeId = nodeId;
+        storagePartition = partition;
         id = partition + "@" + location.getHostString() + ":" + location.getPort();
     }
 
-    public static ReplicaIdentifier of(int partition, InetSocketAddress location) {
-        return new ReplicaIdentifier(partition, location);
+    public static ReplicaIdentifier of(int partition, String nodeId, InetSocketAddress location) {
+        return new ReplicaIdentifier(partition, nodeId, location);
     }
 
     public int getPartition() {
         return partition;
     }
 
+    public int getStoragePartition() {
+        return storagePartition;
+    }
+
     public InetSocketAddress getLocation() {
         return location;
     }
@@ -52,6 +60,10 @@
         return location;
     }
 
+    public String getNodeId() {
+        return nodeId;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
index d803756..06650b0 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
@@ -78,7 +78,7 @@
     }
 
     public synchronized Optional<IPartitionReplica> getPartitionReplica(int partition) {
-        return replicas.stream().filter(replica -> replica.getIdentifier().getPartition() == partition
+        return replicas.stream().filter(replica -> replica.getIdentifier().getStoragePartition() == partition
                 && replica.getStatus() == IPartitionReplica.PartitionReplicaStatus.IN_SYNC).findAny();
     }
 
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
index dd953c4..aadea22 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
@@ -101,7 +101,8 @@
 
     private void process(IReplicationJob job) {
         try {
-            if (skip(job)) {
+            DatasetResourceReference resourceRef = getJobDatasetResource(job);
+            if (skip(resourceRef)) {
                 return;
             }
             synchronized (transferLock) {
@@ -109,11 +110,11 @@
                     return;
                 }
                 final IndexSynchronizer synchronizer = new IndexSynchronizer(job, appCtx);
-                final int indexPartition = getJobPartition(job);
+                final int indexPartition = resourceRef.getPartitionNum();
                 for (ReplicationDestination dest : destinations) {
                     try {
                         Optional<IPartitionReplica> partitionReplica = dest.getPartitionReplica(indexPartition);
-                        if (!partitionReplica.isPresent()) {
+                        if (partitionReplica.isEmpty()) {
                             continue;
                         }
                         PartitionReplica replica = (PartitionReplica) partitionReplica.get();
@@ -129,23 +130,20 @@
         }
     }
 
-    private boolean skip(IReplicationJob job) {
+    private boolean skip(DatasetResourceReference indexFileRef) {
+        return indexFileRef == null || !replicationStrategy.isMatch(indexFileRef.getDatasetId());
+    }
+
+    private DatasetResourceReference getJobDatasetResource(IReplicationJob job) {
         try {
-            final String fileToReplicate = job.getAnyFile();
-            final Optional<DatasetResourceReference> indexFileRefOpt =
-                    resourceRepository.getLocalResourceReference(fileToReplicate);
-            if (!indexFileRefOpt.isPresent()) {
-                LOGGER.warn("skipping replication of {} due to missing dataset resource reference", fileToReplicate);
-                return true;
-            }
-            return !replicationStrategy.isMatch(indexFileRefOpt.get().getDatasetId());
+            return resourceRepository.getLocalResourceReference(job.getAnyFile()).orElse(null);
         } catch (HyracksDataException e) {
             throw new IllegalStateException("Couldn't find resource for " + job.getAnyFile(), e);
         }
     }
 
-    private int getJobPartition(IReplicationJob job) {
-        return ResourceReference.of(job.getAnyFile()).getPartitionNum();
+    private ResourceReference getJobPartition(IReplicationJob job) {
+        return ResourceReference.of(job.getAnyFile());
     }
 
     private void closeChannels() {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
index 6a23ae6..a805d82 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
@@ -128,6 +128,7 @@
     public void replicate(ILogRecord logRecord) throws InterruptedException {
         if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
             synchronized (destinations) {
+                //TODO filter destinations
                 ackTracker.track(logRecord, new HashSet<>(destinations.keySet()));
             }
         }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index 767eb76..e0f9a8f 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -59,6 +59,8 @@
         PersistentLocalResourceRepository resRepo =
                 (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
         final IIOManager ioManager = appCtx.getIoManager();
+        //TODO this should consider replication strategy to avoid checkpointing metadata datasets
+        // when only other datasets should be checkpointed
         final Collection<LocalResource> partitionResources = resRepo.getPartitionResources(partition).values();
         final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
         for (LocalResource ls : partitionResources) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
index cff12de..91bc35b 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
@@ -48,11 +48,12 @@
     public void perform(INcApplicationContext appCtx, IReplicationWorker worker) throws HyracksDataException {
         final PersistentLocalResourceRepository localResourceRepository =
                 (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
+        //TODO do we need to sync such tasks?
         localResourceRepository.cleanup(partition);
-        final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
-        final List<String> partitionResources =
-                localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
-                        .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
+        //TODO need to include checksum of each file too
+        IReplicationStrategy strategy = appCtx.getReplicationManager().getReplicationStrategy();
+        final List<String> partitionResources = localResourceRepository.getPartitionReplicatedFiles(partition, strategy)
+                .stream().map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
         final PartitionResourcesListResponse response =
                 new PartitionResourcesListResponse(partition, partitionResources);
         ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer());
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
new file mode 100644
index 0000000..e73bcb2
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ReplicaPromotionMessage implements ICcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+    private int partition;
+
+    public ReplicaPromotionMessage(String nodeId, int partition) {
+        this.nodeId = nodeId;
+        this.partition = partition;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        LOGGER.info("Partition {} master changed, new master {}", partition, nodeId);
+        IClusterStateManager clusterStateManager = appCtx.getClusterStateManager();
+        clusterStateManager.updateClusterPartition(partition, nodeId, true);
+        clusterStateManager.refreshState();
+    }
+}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index b47fd39..a01a2ba 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -26,7 +26,6 @@
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.replication.api.PartitionReplica;
 import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
@@ -49,14 +48,13 @@
     }
 
     public void sync() throws IOException {
-        final int partition = replica.getIdentifier().getPartition();
+        final int partition = replica.getIdentifier().getStoragePartition();
         final Set<String> replicaFiles = getReplicaFiles(partition);
         final PersistentLocalResourceRepository localResourceRepository =
                 (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
-        final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
-        final Set<String> masterFiles =
-                localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
-                        .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
+        final Set<String> masterFiles = localResourceRepository
+                .getPartitionReplicatedFiles(partition, appCtx.getReplicationManager().getReplicationStrategy())
+                .stream().map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
         // find files on master and not on replica
         final List<String> replicaMissingFiles =
                 masterFiles.stream().filter(file -> !replicaFiles.contains(file)).collect(Collectors.toList());
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 261236c..313263e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -60,15 +60,15 @@
 
     private void syncFiles() throws IOException {
         final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica);
-        // flush replicated dataset to generate disk component for any remaining in-memory components
         final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
+        // flush replicated dataset to generate disk component for any remaining in-memory components
         appCtx.getDatasetLifecycleManager().flushDataset(replStrategy);
         waitForReplicatedDatasetsIO();
         fileSync.sync();
     }
 
     private void checkpointReplicaIndexes() throws IOException {
-        final int partition = replica.getIdentifier().getPartition();
+        final int partition = replica.getIdentifier().getStoragePartition();
         CheckpointPartitionIndexesTask task =
                 new CheckpointPartitionIndexesTask(partition, getPartitionMaxComponentId(partition));
         ReplicationProtocol.sendTo(replica, task);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 9099c88..2576c4f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.runtime.utils;
 
 import java.io.IOException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
 
 import org.apache.asterix.common.api.IConfigValidator;
@@ -101,6 +102,8 @@
     private final IConfigValidator configValidator;
     private final IAdapterFactoryService adapterFactoryService;
 
+    private final ReentrantReadWriteLock compilationLock = new ReentrantReadWriteLock(true);
+
     public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
             Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
             INcLifecycleCoordinator ftStrategy, IJobLifecycleListener activeLifeCycleListener,
@@ -314,4 +317,9 @@
     public IAdapterFactoryService getAdapterFactoryService() {
         return adapterFactoryService;
     }
+
+    @Override
+    public ReentrantReadWriteLock getCompilationLock() {
+        return compilationLock;
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 98a97b0..fc99792 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -24,12 +24,14 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
@@ -83,6 +85,8 @@
     @Override
     public void setCcAppCtx(ICcApplicationContext appCtx) {
         this.appCtx = appCtx;
+        //TODO we need to maintain the node2PartitionsMap in the same state before a failover and only change the
+        // a partition's current active node
         node2PartitionsMap = appCtx.getMetadataProperties().getNodePartitions();
         clusterPartitions = appCtx.getMetadataProperties().getClusterPartitions();
         currentMetadataNode = appCtx.getMetadataProperties().getMetadataNodeName();
@@ -139,20 +143,33 @@
     }
 
     @Override
-    public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters) {
+    public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters,
+            Set<Integer> nodePartitions) {
         if (active) {
             updateClusterCounters(nodeId, localCounters);
             participantNodes.add(nodeId);
         } else {
             participantNodes.remove(nodeId);
         }
-        ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
-        // if this isn't a storage node, it will not have cluster partitions
+        //        ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
         if (nodePartitions != null) {
-            for (ClusterPartition p : nodePartitions) {
-                updateClusterPartition(p.getPartitionId(), nodeId, active);
+            for (Integer partitionId : nodePartitions) {
+                updateClusterPartition(partitionId, nodeId, active);
+            }
+        } else {
+            List<ClusterPartition> nodeActivePartitions = clusterPartitions.values().stream()
+                    .filter(clusterPartition -> clusterPartition.getActiveNodeId().equals(nodeId))
+                    .collect(Collectors.toList());
+            for (ClusterPartition partition : nodeActivePartitions) {
+                updateClusterPartition(partition.getPartitionId(), nodeId, active);
             }
         }
+        // if this isn't a storage node, it will not have cluster partitions
+        //        if (nodePartitions != null) {
+        //            for (ClusterPartition p : nodePartitions) {
+        //                updateClusterPartition(p.getPartitionId(), nodeId, active);
+        //            }
+        //        }
     }
 
     @Override
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 29dedf7..556ab6a 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -248,7 +248,7 @@
         }
     }
 
-    private static FileReference getLocalResourceFileByName(IIOManager ioManager, String resourcePath)
+    public static FileReference getLocalResourceFileByName(IIOManager ioManager, String resourcePath)
             throws HyracksDataException {
         String fileName = resourcePath + File.separator + StorageConstants.METADATA_FILE_NAME;
         return ioManager.resolve(fileName);
@@ -603,4 +603,8 @@
     private static boolean isComponentFile(File indexDir, String fileName) {
         return COMPONENT_FILES_FILTER.accept(indexDir, fileName);
     }
+
+    public Path[] getStorageRoots() {
+        return storageRoots;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
index 0a0e026..c7be4ae 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
@@ -59,7 +59,7 @@
     @Override
     public FileReference getFileReference(IIOManager ioManager) throws HyracksDataException {
         if (cached == null) {
-            cached = new FileReference(ioManager.getIODevices().get(ioDevice), getPath());
+            cached = super.getFileReference(ioManager);
         }
         return cached;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index f54a62c..5166970 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -184,6 +184,9 @@
 
     public synchronized void failNode(String nodeId) throws HyracksException {
         NodeControllerState state = nodeRegistry.get(nodeId);
+        if (state == null) {
+            return;
+        }
         Set<JobId> affectedJobIds = state.getActiveJobIds();
         // Removes the node from node map.
         nodeRegistry.remove(nodeId);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12503
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I3b8de5d3354be36768e3c9da116c3c96a552a867
Gerrit-Change-Number: 12503
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Hubail <mh...@apache.org>
Gerrit-MessageType: newchange

Change in asterixdb[master]: WIP: Failover with -1

Posted by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu>.
From Murtadha Hubail <mh...@apache.org>:

Murtadha Hubail has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12503 )


Change subject: WIP: Failover with -1
......................................................................

WIP: Failover with -1

Change-Id: I3b8de5d3354be36768e3c9da116c3c96a552a867
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllNonMetadataDatasetsReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
A asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
41 files changed, 421 insertions(+), 95 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/03/12503/1

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
index d7d5994..8fd16ff 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
@@ -135,6 +135,7 @@
                 final ObjectNode replicaJson = OBJECT_MAPPER.createObjectNode();
                 replicaJson.put("location", toHostPort(replica.getIdentifier().getLocation()));
                 replicaJson.put("status", replica.getStatus().toString());
+                replicaJson.put("nodeId", replica.getIdentifier().getNodeId());
                 replicasArray.add(replicaJson);
             }
             partitionJson.set("replicas", replicasArray);
@@ -167,11 +168,12 @@
         final String partition = request.getParameter("partition");
         final String host = request.getParameter("host");
         final String port = request.getParameter("port");
+        final String nodeId = request.getParameter("nodeId");
         if (partition == null || host == null || port == null) {
             return null;
         }
-        final InetSocketAddress replicaAddress = new InetSocketAddress(host, Integer.valueOf(port));
-        return ReplicaIdentifier.of(Integer.valueOf(partition), replicaAddress);
+        final InetSocketAddress replicaAddress = new InetSocketAddress(host, Integer.parseInt(port));
+        return ReplicaIdentifier.of(Integer.parseInt(partition), nodeId, replicaAddress);
     }
 
     private void processPromote(IServletRequest request, IServletResponse response) throws HyracksDataException {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 9aa433f..53d0973 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -38,7 +38,6 @@
 import org.apache.asterix.common.api.IPropertiesFactory;
 import org.apache.asterix.common.api.IReceptionist;
 import org.apache.asterix.common.api.IReceptionistFactory;
-import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.ActiveProperties;
 import org.apache.asterix.common.config.BuildProperties;
 import org.apache.asterix.common.config.CompilerProperties;
@@ -225,9 +224,9 @@
                 new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
                         virtualBufferCache, indexCheckpointManagerProvider, ioManager.getIODevices().size());
         final String nodeId = getServiceContext().getNodeId();
-        final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
+        final String[] nodePartitions = nodeProperties.getActivePartitions();
         final Set<Integer> nodePartitionsIds =
-                Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet());
+                Arrays.stream(nodePartitions).map(Integer::parseInt).collect(Collectors.toSet());
         replicaManager = new ReplicaManager(this, nodePartitionsIds);
         isShuttingdown = false;
         activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 0359cf1..a0adde1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -160,7 +160,7 @@
     @Override
     public void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException {
         state = SystemState.RECOVERING;
-        LOGGER.info("starting recovery ...");
+        LOGGER.info("starting recovery for partitions {}", partitions);
 
         long readableSmallestLSN = logMgr.getReadableSmallestLSN();
         Checkpoint checkpointObject = checkpointManager.getLatest();
@@ -362,19 +362,17 @@
                                 index = (ILSMIndex) localResourceMetadata.createInstance(serviceCtx);
                                 datasetLifecycleManager.register(localResource.getPath(), index);
                                 datasetLifecycleManager.open(localResource.getPath());
-                                try {
-                                    final DatasetResourceReference resourceReference =
-                                            DatasetResourceReference.of(localResource);
-                                    maxDiskLastLsn =
-                                            indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
-                                } catch (HyracksDataException e) {
-                                    datasetLifecycleManager.close(localResource.getPath());
-                                    throw e;
-                                }
+                                maxDiskLastLsn = getResourceLowWaterMark(localResource, datasetLifecycleManager,
+                                        indexCheckpointManagerProvider);
                                 //#. set resourceId and maxDiskLastLSN to the map
                                 resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
                             } else {
-                                maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
+                                if (!resourceId2MaxLSNMap.containsKey(resourceId)) {
+                                    maxDiskLastLsn = getResourceLowWaterMark(localResource, datasetLifecycleManager,
+                                            indexCheckpointManagerProvider);
+                                } else {
+                                    maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
+                                }
                             }
                             // lsn @ maxDiskLastLsn is either a flush log or a master replica log
                             if (lsn >= maxDiskLastLsn) {
@@ -450,6 +448,19 @@
         }
     }
 
+    private long getResourceLowWaterMark(LocalResource localResource, IDatasetLifecycleManager datasetLifecycleManager,
+            IIndexCheckpointManagerProvider indexCheckpointManagerProvider) throws HyracksDataException {
+        long maxDiskLastLsn;
+        try {
+            final DatasetResourceReference resourceReference = DatasetResourceReference.of(localResource);
+            maxDiskLastLsn = indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
+        } catch (HyracksDataException e) {
+            datasetLifecycleManager.close(localResource.getPath());
+            throw e;
+        }
+        return maxDiskLastLsn;
+    }
+
     private boolean needToFreeMemory() {
         return Runtime.getRuntime().freeMemory() < cachedEntityCommitsPerJobSize;
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index ad70cf4..546af88 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -30,11 +30,14 @@
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.storage.ReplicaIdentifier;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.asterix.replication.messaging.ReplicaPromotionMessage;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.config.IApplicationConfig;
@@ -68,17 +71,14 @@
 
     @Override
     public synchronized void addReplica(ReplicaIdentifier id) {
-        final NodeControllerService controllerService =
-                (NodeControllerService) appCtx.getServiceContext().getControllerService();
-        final NodeStatus nodeStatus = controllerService.getNodeStatus();
-        if (nodeStatus != NodeStatus.ACTIVE) {
-            LOGGER.warn("Ignoring request to add replica. Node is not ACTIVE yet. Current status: {}", nodeStatus);
+        if (nodeNotReady("addReplica")) {
             return;
         }
-        if (!partitions.contains(id.getPartition())) {
-            throw new IllegalStateException(
-                    "This node is not the current master of partition(" + id.getPartition() + ")");
-        }
+        //FIXIT return this condition when -1 is added to the set of active partitions
+        //        if (!partitions.contains(id.getPartition())) {
+        //            throw new IllegalStateException(
+        //                    "This node is not the current master of partition(" + id.getPartition() + ")");
+        //        }
         if (isSelf(id)) {
             LOGGER.info("ignoring request to add replica to ourselves");
             return;
@@ -89,8 +89,13 @@
 
     @Override
     public synchronized void removeReplica(ReplicaIdentifier id) {
+        if (nodeNotReady("removeReplica")) {
+            return;
+        }
+        //TODO return illegal state?
         if (!replicas.containsKey(id)) {
-            throw new IllegalStateException("replica with id(" + id + ") does not exist");
+            return;
+            //            throw new IllegalStateException("replica with id(" + id + ") does not exist");
         }
         PartitionReplica replica = replicas.remove(id);
         appCtx.getReplicationManager().unregister(replica);
@@ -110,6 +115,9 @@
 
     @Override
     public synchronized void promote(int partition) throws HyracksDataException {
+        if (nodeNotReady("promote")) {
+            return;
+        }
         if (partitions.contains(partition)) {
             return;
         }
@@ -119,15 +127,30 @@
         final IRecoveryManager recoveryManager = appCtx.getTransactionSubsystem().getRecoveryManager();
         recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true);
         partitions.add(partition);
+        // TODO should we just persist the takeover success on metakv to prevent and race between CC being active
+        // and the partition being active?
+        ReplicaPromotionMessage message =
+                new ReplicaPromotionMessage(appCtx.getTransactionSubsystem().getId(), partition);
+        INCMessageBroker messageBroker = (INCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+        try {
+            messageBroker.sendMessageToPrimaryCC(message);
+        } catch (Exception e) {
+            //TODO halt
+            LOGGER.error("failed to notify CC of replica promotion");
+        }
     }
 
     @Override
     public synchronized void release(int partition) throws HyracksDataException {
+        if (nodeNotReady("release")) {
+            throw new IllegalStateException("received release request while node is not ready");
+        }
         if (!partitions.contains(partition)) {
             return;
         }
         final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
-        datasetLifecycleManager.flushDataset(appCtx.getReplicationManager().getReplicationStrategy());
+        IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
+        datasetLifecycleManager.flushDataset(replicationStrategy);
         closePartitionResources(partition);
         final List<IPartitionReplica> partitionReplicas = getReplicas(partition);
         for (IPartitionReplica replica : partitionReplicas) {
@@ -141,14 +164,15 @@
         return replicaSyncLock;
     }
 
-    private void closePartitionResources(int partition) throws HyracksDataException {
+    public void closePartitionResources(int partition) throws HyracksDataException {
         final PersistentLocalResourceRepository resourceRepository =
                 (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
         final Map<Long, LocalResource> partitionResources = resourceRepository.getPartitionResources(partition);
         final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
         for (LocalResource resource : partitionResources.values()) {
-            datasetLifecycleManager.close(resource.getPath());
+            datasetLifecycleManager.closeIfOpen(resource.getPath());
         }
+        datasetLifecycleManager.closePartition(partition);
     }
 
     private boolean isSelf(ReplicaIdentifier id) {
@@ -157,6 +181,17 @@
         int port = appConfig.getInt(NCConfig.Option.REPLICATION_LISTEN_PORT);
 
         final InetSocketAddress replicaAddress = new InetSocketAddress(host, port);
-        return id.equals(ReplicaIdentifier.of(id.getPartition(), replicaAddress));
+        return id.equals(ReplicaIdentifier.of(id.getPartition(), id.getNodeId(), replicaAddress));
+    }
+
+    private boolean nodeNotReady(String request) {
+        final NodeControllerService controllerService =
+                (NodeControllerService) appCtx.getServiceContext().getControllerService();
+        final NodeStatus nodeStatus = controllerService.getNodeStatus();
+        if (nodeStatus != NodeStatus.ACTIVE) {
+            LOGGER.warn("Ignoring request {}. Node is not ACTIVE yet. Current status: {}", request, nodeStatus);
+            return true;
+        }
+        return false;
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
index f58f871..f19d896 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
@@ -38,6 +38,7 @@
     public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
         INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
         try {
+            //TODO do we need to promote here?
             appContext.getReplicaManager().promote(partitionId);
             SystemState state = appContext.getTransactionSubsystem().getRecoveryManager().getSystemState();
             appContext.initializeMetadata(state == SystemState.PERMANENT_DATA_LOSS, partitionId);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index c4e4f82..cacb15a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -26,14 +26,12 @@
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
 import org.apache.asterix.app.nc.task.CheckpointTask;
@@ -51,7 +49,6 @@
 import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -102,7 +99,8 @@
     @Override
     public void notifyNodeFailure(String nodeId, InetSocketAddress replicaAddress) throws HyracksDataException {
         pendingStartupCompletionNodes.remove(nodeId);
-        clusterManager.updateNodeState(nodeId, false, null);
+        //TODO on node failure, we need to set all of it's active partitions to inactive
+        clusterManager.updateNodeState(nodeId, false, null, null);
         if (nodeId.equals(metadataNodeId)) {
             clusterManager.updateMetadataNode(metadataNodeId, false);
         }
@@ -138,7 +136,8 @@
     private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
         final String nodeId = msg.getNodeId();
         nodeSecretsMap.put(nodeId, msg.getSecrets());
-        List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState());
+        List<INCLifecycleTask> tasks =
+                buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState(), msg.getActivePartitions());
         RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
         try {
             messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
@@ -157,7 +156,7 @@
             return;
         }
         if (msg.isSuccess()) {
-            clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters());
+            clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters(), msg.getPartitions());
             if (msg.getNodeId().equals(metadataNodeId)) {
                 clusterManager.updateMetadataNode(metadataNodeId, true);
             }
@@ -167,7 +166,8 @@
         }
     }
 
-    protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state) {
+    protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state,
+            Set<Integer> activePartitions) {
         LOGGER.info("Building registration tasks for node {} with status {} and system state: {}", nodeId, nodeStatus,
                 state);
         final boolean isMetadataNode = nodeId.equals(metadataNodeId);
@@ -175,7 +175,7 @@
             case ACTIVE:
                 return buildActiveNCRegTasks(isMetadataNode);
             case IDLE:
-                return buildIdleNcRegTasks(nodeId, isMetadataNode, state);
+                return buildIdleNcRegTasks(nodeId, isMetadataNode, state, activePartitions);
             default:
                 return new ArrayList<>();
         }
@@ -210,15 +210,18 @@
         }
     }
 
-    protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state) {
+    protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state,
+            Set<Integer> activePartitions) {
         final List<INCLifecycleTask> tasks = new ArrayList<>();
         tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING));
         if (state == SystemState.CORRUPTED) {
-            // need to perform local recovery for node partitions
-            LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.stream(clusterManager.getNodePartitions(newNodeId))
-                    .map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
+            // need to perform local recovery for node active partitions
+            //TODO ensure recovery is done for all active partitions before the CheckpointTask since that
+            // might delete any old transaction log files
+            LocalRecoveryTask rt = new LocalRecoveryTask(activePartitions);
             tasks.add(rt);
         }
+
         if (replicationEnabled) {
             tasks.add(new StartReplicationServiceTask());
         }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
index 79fa7c8..c70b6bc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.app.replication.message;
 
+import java.util.Set;
+
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
@@ -31,11 +33,14 @@
     private final boolean success;
     private Throwable exception;
     private final NcLocalCounters localCounters;
+    private Set<Integer> partitions;
 
-    public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters) {
+    public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters,
+            Set<Integer> partitions) {
         this.nodeId = nodeId;
         this.success = success;
         this.localCounters = localCounters;
+        this.partitions = partitions;
     }
 
     @Override
@@ -63,6 +68,10 @@
         return localCounters;
     }
 
+    public Set<Integer> getPartitions() {
+        return partitions;
+    }
+
     @Override
     public MessageType getType() {
         return MessageType.REGISTRATION_TASKS_RESULT;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
index c2cc63c..870413b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
@@ -20,6 +20,7 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
@@ -37,25 +38,27 @@
 public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICcAddressedMessage {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private static final long serialVersionUID = 2L;
+    private static final long serialVersionUID = 3L;
     protected final SystemState state;
     protected final String nodeId;
     protected final NodeStatus nodeStatus;
     protected final Map<String, Object> secrets;
+    protected final Set<Integer> activePartitions;
 
     public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state,
-            Map<String, Object> secretsEphemeral) {
+            Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) {
         this.state = state;
         this.nodeId = nodeId;
         this.nodeStatus = nodeStatus;
         this.secrets = new HashMap<>(secretsEphemeral);
+        this.activePartitions = activePartitions;
     }
 
     public static void send(CcId ccId, NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState,
-            Map<String, Object> secretsEphemeral) throws HyracksDataException {
+            Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) throws HyracksDataException {
         try {
-            RegistrationTasksRequestMessage msg =
-                    new RegistrationTasksRequestMessage(cs.getId(), nodeStatus, systemState, secretsEphemeral);
+            RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus,
+                    systemState, secretsEphemeral, activePartitions);
             ((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(ccId, msg);
         } catch (Exception e) {
             LOGGER.log(Level.ERROR, "Unable to send RegistrationTasksRequestMessage to CC", e);
@@ -88,4 +91,8 @@
     public Map<String, Object> getSecrets() {
         return secrets;
     }
+
+    public Set<Integer> getActivePartitions() {
+        return activePartitions;
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index 1227a6f..7848849 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.app.replication.message;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -72,7 +73,9 @@
             }
             NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(),
                     (NodeControllerService) appCtx.getServiceContext().getControllerService()) : null;
-            NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success, localCounter);
+            Set<Integer> partitions = appCtx.getReplicaManager().getPartitions();
+            NCLifecycleTaskReportMessage result =
+                    new NCLifecycleTaskReportMessage(nodeId, success, localCounter, partitions);
             result.setException(exception);
             try {
                 broker.sendMessageToCC(getCcId(), result);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 3571d88..76eb184 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -42,6 +42,7 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.active.ActivityState;
@@ -272,6 +273,7 @@
     protected final ICcApplicationContext appCtx;
     protected final SessionOutput sessionOutput;
     protected final SessionConfig sessionConfig;
+    protected final ReentrantReadWriteLock compilationLock;
     protected Dataverse activeDataverse;
     protected final List<FunctionDecl> declaredFunctions;
     protected final ILangCompilationProvider compilationProvider;
@@ -290,6 +292,7 @@
         this.appCtx = appCtx;
         this.lockManager = appCtx.getMetadataLockManager();
         this.lockUtil = appCtx.getMetadataLockUtil();
+        this.compilationLock = appCtx.getCompilationLock();
         this.statements = statements;
         this.sessionOutput = output;
         this.sessionConfig = output.config();
@@ -3367,12 +3370,14 @@
         final IMetadataLocker locker = new IMetadataLocker() {
             @Override
             public void lock() throws AlgebricksException {
+                compilationLock.readLock().lock();
                 lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
             }
 
             @Override
             public void unlock() {
                 metadataProvider.getLocks().unlock();
+                compilationLock.readLock().unlock();
             }
         };
         final IStatementCompiler compiler = () -> {
@@ -3962,6 +3967,7 @@
         final IMetadataLocker locker = new IMetadataLocker() {
             @Override
             public void lock() {
+                compilationLock.readLock().lock();
             }
 
             @Override
@@ -3969,6 +3975,7 @@
                 metadataProvider.getLocks().unlock();
                 // release external datasets' locks acquired during compilation of the query
                 ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
+                compilationLock.readLock().unlock();
             }
         };
         final IStatementCompiler compiler = () -> {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index c148c92..1f55631 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -36,6 +36,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.api.http.server.BasicAuthServlet;
@@ -299,8 +300,10 @@
         final Map httpSecrets =
                 apiServer != null ? Collections.singletonMap(SYS_AUTH_HEADER, apiServer.ctx().get(SYS_AUTH_HEADER))
                         : Collections.emptyMap();
+        Set<Integer> activePartitions = Arrays.stream(runtimeContext.getNodeProperties().getActivePartitions())
+                .map(Integer::parseInt).collect(Collectors.toSet());
         RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
-                currentStatus, systemState, httpSecrets);
+                currentStatus, systemState, httpSecrets, activePartitions);
     }
 
     @Override
@@ -340,7 +343,11 @@
     @Override
     public IFileDeviceResolver getFileDeviceResolver() {
         return (relPath, devices) -> {
-            int ioDeviceIndex = Math.abs(StoragePathUtil.getPartitionNumFromRelativePath(relPath) % devices.size());
+            int partition = StoragePathUtil.getPartitionNumFromRelativePath(relPath);
+            if (partition < 0) {
+                return devices.get(0);
+            }
+            int ioDeviceIndex = Math.abs(partition % devices.size());
             return devices.get(ioDeviceIndex);
         };
     }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index 9cc295e..370b330 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -200,7 +200,8 @@
 
     private void notifyNodeStartupCompletion(CcApplicationContext applicationContext, String nodeId)
             throws HyracksDataException {
-        NCLifecycleTaskReportMessage msg = new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters());
+        NCLifecycleTaskReportMessage msg =
+                new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters(), Collections.emptySet());
         applicationContext.getNcLifecycleCoordinator().process(msg);
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index b03af55..48bada3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -169,4 +169,8 @@
      * @return the current datasets io stats
      */
     StorageIOStats getDatasetsIOStats();
+
+    void closeIfOpen(String resourcePath) throws HyracksDataException;
+
+    void closePartition(int partitionId);
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
index cc99421..9c66acf 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
@@ -33,6 +33,15 @@
         this.ioDeviceNum = ioDeviceNum;
     }
 
+    public static ClusterPartition of(int partitionId, String nodeId, int ioDeviceNum, String activeNodeId,
+            boolean active, boolean pendingActivation) {
+        ClusterPartition partition = new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+        partition.activeNodeId = activeNodeId;
+        partition.active = active;
+        partition.pendingActivation = pendingActivation;
+        return partition;
+    }
+
     public int getPartitionId() {
         return partitionId;
     }
@@ -71,7 +80,10 @@
 
     @Override
     public ClusterPartition clone() {
-        return new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+        ClusterPartition clone = new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+        clone.setActiveNodeId(activeNodeId);
+        clone.setActive(active);
+        return clone;
     }
 
     @Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index a37e6e4..94f80cc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -62,9 +62,11 @@
      * @param nodeId
      * @param active
      * @param ncLocalCounters
+     * @param partitions
      * @throws HyracksDataException
      */
-    void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters) throws HyracksDataException;
+    void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters, Set<Integer> partitions)
+            throws HyracksDataException;
 
     /**
      * Updates the active node and active state of the cluster partition with id {@code partitionNum}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
index aaf6316..efc9b6a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
@@ -46,7 +46,11 @@
         STARTING_PARTITION_ID(
                 OptionTypes.INTEGER,
                 -1,
-                "The first partition id to assign to iodevices on this node (-1 == auto-assign)");
+                "The first partition id to assign to iodevices on this node (-1 == auto-assign)"),
+        ACTIVE_PARTITIONS(
+                OptionTypes.STRING_ARRAY,
+                new String[] {},
+                "List of partitions this node is currently the master of");
 
         private final IOptionType type;
         private final Object defaultValue;
@@ -95,7 +99,7 @@
 
         @Override
         public boolean hidden() {
-            return this == INITIAL_RUN || this == STARTING_PARTITION_ID;
+            return this == INITIAL_RUN || this == STARTING_PARTITION_ID || this == ACTIVE_PARTITIONS;
         }
 
     }
@@ -115,4 +119,8 @@
     public String getTxnLogDir() {
         return accessor.getString(Option.TXN_LOG_DIR);
     }
+
+    public String[] getActivePartitions() {
+        return accessor.getStringArray(Option.ACTIVE_PARTITIONS);
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
index 3b6100c..5d1a8e4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
@@ -132,6 +132,7 @@
         }
         stores.put(ncId, nodeStores);
         nodePartitionsMap.put(ncId, nodePartitions);
+        LOGGER.info("Node partitions {}", nodeCfg.getStringArray(NodeProperties.Option.ACTIVE_PARTITIONS));
     }
 
     private void loadAsterixBuildProperties() throws AsterixException {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index b2f4034..4b05629 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -113,6 +113,8 @@
 
     @Override
     public synchronized void register(String resourcePath, IIndex index) throws HyracksDataException {
+        LOGGER.info("registering {} with dataset life cycle manager. Thread: {}", index,
+                Thread.currentThread().getStackTrace());
         validateDatasetLifecycleManagerState();
         int did = getDIDfromResourcePath(resourcePath);
         LocalResource resource = resourceRepository.get(resourcePath);
@@ -141,6 +143,8 @@
 
     @Override
     public synchronized void unregister(String resourcePath) throws HyracksDataException {
+        LOGGER.info("unregister {} with dataset life cycle manager. Thread: {}", resourcePath,
+                Thread.currentThread().getStackTrace());
         validateDatasetLifecycleManagerState();
         int did = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
@@ -559,6 +563,51 @@
         return stats;
     }
 
+    @Override
+    public synchronized void closeIfOpen(String resourcePath) throws HyracksDataException {
+        validateDatasetLifecycleManagerState();
+        int did = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
+
+        DatasetResource dsr = datasets.get(did);
+        IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
+
+        if (dsr == null || iInfo == null) {
+            return;
+        }
+
+        PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
+        if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
+            if (LOGGER.isErrorEnabled()) {
+                final String logMsg = String.format(
+                        "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
+                        resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
+                LOGGER.error(logMsg);
+            }
+            throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
+                    StoragePathUtil.getIndexNameFromPath(resourcePath));
+        }
+
+        // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
+        DatasetInfo dsInfo = dsr.getDatasetInfo();
+        dsInfo.waitForIO();
+        closeIndex(iInfo);
+        dsInfo.removeIndex(resourceID);
+        synchronized (dsInfo) {
+            if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
+                    && !dsInfo.isExternal()) {
+                removeDatasetFromCache(dsInfo.getDatasetID());
+            }
+        }
+    }
+
+    @Override
+    public synchronized void closePartition(int partitionId) {
+        for (DatasetResource ds : datasets.values()) {
+            ds.removePartition(partitionId);
+        }
+    }
+
     private void closeIndex(IndexInfo indexInfo) throws HyracksDataException {
         if (indexInfo.isOpen()) {
             ILSMOperationTracker opTracker = indexInfo.getIndex().getOperationTracker();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 54e1976..db9eabb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -182,4 +182,10 @@
     public boolean isMetadataDataset() {
         return MetadataIndexImmutableProperties.isMetadataDataset(getDatasetID());
     }
+
+    public void removePartition(int partitionId) {
+        datasetPrimaryOpTrackers.remove(partitionId);
+        datasetComponentIdGenerators.remove(partitionId);
+        datasetRateLimiters.remove(partitionId);
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
index 7f448e3..249505b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
@@ -41,7 +41,7 @@
     /**
      * The resource partition
      */
-    private final int partition;
+    private int partition;
     private final IResource resource;
 
     public DatasetLocalResource(int datasetId, int partition, IResource resource) {
@@ -68,6 +68,10 @@
         resource.setPath(path);
     }
 
+    public void setPartition(int partition) {
+        this.partition = partition;
+    }
+
     @Override
     public IIndex createInstance(INCServiceContext ncServiceCtx) throws HyracksDataException {
         return resource.createInstance(ncServiceCtx);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index ad90814..72fb32c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.common.dataflow;
 
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.ICoordinationService;
 import org.apache.asterix.common.api.IMetadataLockManager;
@@ -149,4 +151,11 @@
      * @return the adapter factory service
      */
     IAdapterFactoryService getAdapterFactoryService();
+
+    /**
+     * Gets the compilation lock
+     *
+     * @return the compilation lock
+     */
+    ReentrantReadWriteLock getCompilationLock();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
index a8cd14f..46bca34 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
@@ -20,10 +20,12 @@
 
 public class MetadataIndexImmutableProperties {
 
+    public static final int METADATA_PARTITION_ID = -1;
     public static final int FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID = 52;
     public static final int FIRST_AVAILABLE_USER_DATASET_ID = 100;
     public static final int METADATA_DATASETS_PARTITIONS = 1;
     public static final int METADATA_DATASETS_COUNT = 15;
+    //    public static final Map<Integer, Integer>
 
     private final String indexName;
     private final int datasetId;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
index c969777..1dfc2e2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
@@ -24,4 +24,9 @@
     public boolean isMatch(int datasetId) {
         return true;
     }
+
+    @Override
+    public String getName() {
+        return "all";
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllNonMetadataDatasetsReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllNonMetadataDatasetsReplicationStrategy.java
new file mode 100644
index 0000000..7d73e79
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllNonMetadataDatasetsReplicationStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+
+public class AllNonMetadataDatasetsReplicationStrategy implements IReplicationStrategy {
+
+    @Override
+    public boolean isMatch(int datasetId) {
+        return !MetadataIndexImmutableProperties.isMetadataDataset(datasetId);
+    }
+
+    @Override
+    public String getName() {
+        return "non-metadata";
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
index 0ad8be2..340bad4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
@@ -18,11 +18,15 @@
  */
 package org.apache.asterix.common.replication;
 
-public interface IReplicationStrategy {
+import java.io.Serializable;
+
+public interface IReplicationStrategy extends Serializable {
 
     /**
      * @param datasetId
      * @return True, if the dataset should be replicated. Otherwise false.
      */
     boolean isMatch(int datasetId);
+
+    String getName();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
index 2b97fe7..5278208 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
@@ -26,4 +26,9 @@
     public boolean isMatch(int datasetId) {
         return MetadataIndexImmutableProperties.isMetadataDataset(datasetId);
     }
+
+    @Override
+    public String getName() {
+        return "metadata";
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
index 8ffaa15..49083c8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
@@ -24,4 +24,9 @@
     public boolean isMatch(int datasetId) {
         return false;
     }
+
+    @Override
+    public String getName() {
+        return "none";
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
index c4bb74c..95ea055 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
@@ -27,21 +27,29 @@
     private final int partition;
     private final String id;
     private volatile InetSocketAddress location;
+    private final String nodeId;
+    private final int storagePartition;
 
-    private ReplicaIdentifier(int partition, InetSocketAddress location) {
+    private ReplicaIdentifier(int partition, String nodeId, InetSocketAddress location) {
         this.partition = partition;
         this.location = location;
+        this.nodeId = nodeId;
+        storagePartition = partition;
         id = partition + "@" + location.getHostString() + ":" + location.getPort();
     }
 
-    public static ReplicaIdentifier of(int partition, InetSocketAddress location) {
-        return new ReplicaIdentifier(partition, location);
+    public static ReplicaIdentifier of(int partition, String nodeId, InetSocketAddress location) {
+        return new ReplicaIdentifier(partition, nodeId, location);
     }
 
     public int getPartition() {
         return partition;
     }
 
+    public int getStoragePartition() {
+        return storagePartition;
+    }
+
     public InetSocketAddress getLocation() {
         return location;
     }
@@ -52,6 +60,10 @@
         return location;
     }
 
+    public String getNodeId() {
+        return nodeId;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
index d803756..06650b0 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
@@ -78,7 +78,7 @@
     }
 
     public synchronized Optional<IPartitionReplica> getPartitionReplica(int partition) {
-        return replicas.stream().filter(replica -> replica.getIdentifier().getPartition() == partition
+        return replicas.stream().filter(replica -> replica.getIdentifier().getStoragePartition() == partition
                 && replica.getStatus() == IPartitionReplica.PartitionReplicaStatus.IN_SYNC).findAny();
     }
 
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
index dd953c4..aadea22 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
@@ -101,7 +101,8 @@
 
     private void process(IReplicationJob job) {
         try {
-            if (skip(job)) {
+            DatasetResourceReference resourceRef = getJobDatasetResource(job);
+            if (skip(resourceRef)) {
                 return;
             }
             synchronized (transferLock) {
@@ -109,11 +110,11 @@
                     return;
                 }
                 final IndexSynchronizer synchronizer = new IndexSynchronizer(job, appCtx);
-                final int indexPartition = getJobPartition(job);
+                final int indexPartition = resourceRef.getPartitionNum();
                 for (ReplicationDestination dest : destinations) {
                     try {
                         Optional<IPartitionReplica> partitionReplica = dest.getPartitionReplica(indexPartition);
-                        if (!partitionReplica.isPresent()) {
+                        if (partitionReplica.isEmpty()) {
                             continue;
                         }
                         PartitionReplica replica = (PartitionReplica) partitionReplica.get();
@@ -129,23 +130,20 @@
         }
     }
 
-    private boolean skip(IReplicationJob job) {
+    private boolean skip(DatasetResourceReference indexFileRef) {
+        return indexFileRef == null || !replicationStrategy.isMatch(indexFileRef.getDatasetId());
+    }
+
+    private DatasetResourceReference getJobDatasetResource(IReplicationJob job) {
         try {
-            final String fileToReplicate = job.getAnyFile();
-            final Optional<DatasetResourceReference> indexFileRefOpt =
-                    resourceRepository.getLocalResourceReference(fileToReplicate);
-            if (!indexFileRefOpt.isPresent()) {
-                LOGGER.warn("skipping replication of {} due to missing dataset resource reference", fileToReplicate);
-                return true;
-            }
-            return !replicationStrategy.isMatch(indexFileRefOpt.get().getDatasetId());
+            return resourceRepository.getLocalResourceReference(job.getAnyFile()).orElse(null);
         } catch (HyracksDataException e) {
             throw new IllegalStateException("Couldn't find resource for " + job.getAnyFile(), e);
         }
     }
 
-    private int getJobPartition(IReplicationJob job) {
-        return ResourceReference.of(job.getAnyFile()).getPartitionNum();
+    private ResourceReference getJobPartition(IReplicationJob job) {
+        return ResourceReference.of(job.getAnyFile());
     }
 
     private void closeChannels() {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
index 6a23ae6..a805d82 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
@@ -128,6 +128,7 @@
     public void replicate(ILogRecord logRecord) throws InterruptedException {
         if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
             synchronized (destinations) {
+                //TODO filter destinations
                 ackTracker.track(logRecord, new HashSet<>(destinations.keySet()));
             }
         }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index 767eb76..e0f9a8f 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -59,6 +59,8 @@
         PersistentLocalResourceRepository resRepo =
                 (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
         final IIOManager ioManager = appCtx.getIoManager();
+        //TODO this should consider replication strategy to avoid checkpointing metadata datasets
+        // when only other datasets should be checkpointed
         final Collection<LocalResource> partitionResources = resRepo.getPartitionResources(partition).values();
         final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
         for (LocalResource ls : partitionResources) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
index cff12de..91bc35b 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
@@ -48,11 +48,12 @@
     public void perform(INcApplicationContext appCtx, IReplicationWorker worker) throws HyracksDataException {
         final PersistentLocalResourceRepository localResourceRepository =
                 (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
+        //TODO do we need to sync such tasks?
         localResourceRepository.cleanup(partition);
-        final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
-        final List<String> partitionResources =
-                localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
-                        .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
+        //TODO need to include checksum of each file too
+        IReplicationStrategy strategy = appCtx.getReplicationManager().getReplicationStrategy();
+        final List<String> partitionResources = localResourceRepository.getPartitionReplicatedFiles(partition, strategy)
+                .stream().map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
         final PartitionResourcesListResponse response =
                 new PartitionResourcesListResponse(partition, partitionResources);
         ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer());
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
new file mode 100644
index 0000000..e73bcb2
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ReplicaPromotionMessage implements ICcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+    private int partition;
+
+    public ReplicaPromotionMessage(String nodeId, int partition) {
+        this.nodeId = nodeId;
+        this.partition = partition;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        LOGGER.info("Partition {} master changed, new master {}", partition, nodeId);
+        IClusterStateManager clusterStateManager = appCtx.getClusterStateManager();
+        clusterStateManager.updateClusterPartition(partition, nodeId, true);
+        clusterStateManager.refreshState();
+    }
+}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index b47fd39..a01a2ba 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -26,7 +26,6 @@
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.replication.api.PartitionReplica;
 import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
@@ -49,14 +48,13 @@
     }
 
     public void sync() throws IOException {
-        final int partition = replica.getIdentifier().getPartition();
+        final int partition = replica.getIdentifier().getStoragePartition();
         final Set<String> replicaFiles = getReplicaFiles(partition);
         final PersistentLocalResourceRepository localResourceRepository =
                 (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
-        final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
-        final Set<String> masterFiles =
-                localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
-                        .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
+        final Set<String> masterFiles = localResourceRepository
+                .getPartitionReplicatedFiles(partition, appCtx.getReplicationManager().getReplicationStrategy())
+                .stream().map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
         // find files on master and not on replica
         final List<String> replicaMissingFiles =
                 masterFiles.stream().filter(file -> !replicaFiles.contains(file)).collect(Collectors.toList());
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 261236c..313263e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -60,15 +60,15 @@
 
     private void syncFiles() throws IOException {
         final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica);
-        // flush replicated dataset to generate disk component for any remaining in-memory components
         final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
+        // flush replicated dataset to generate disk component for any remaining in-memory components
         appCtx.getDatasetLifecycleManager().flushDataset(replStrategy);
         waitForReplicatedDatasetsIO();
         fileSync.sync();
     }
 
     private void checkpointReplicaIndexes() throws IOException {
-        final int partition = replica.getIdentifier().getPartition();
+        final int partition = replica.getIdentifier().getStoragePartition();
         CheckpointPartitionIndexesTask task =
                 new CheckpointPartitionIndexesTask(partition, getPartitionMaxComponentId(partition));
         ReplicationProtocol.sendTo(replica, task);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 9099c88..2576c4f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.runtime.utils;
 
 import java.io.IOException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
 
 import org.apache.asterix.common.api.IConfigValidator;
@@ -101,6 +102,8 @@
     private final IConfigValidator configValidator;
     private final IAdapterFactoryService adapterFactoryService;
 
+    private final ReentrantReadWriteLock compilationLock = new ReentrantReadWriteLock(true);
+
     public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
             Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
             INcLifecycleCoordinator ftStrategy, IJobLifecycleListener activeLifeCycleListener,
@@ -314,4 +317,9 @@
     public IAdapterFactoryService getAdapterFactoryService() {
         return adapterFactoryService;
     }
+
+    @Override
+    public ReentrantReadWriteLock getCompilationLock() {
+        return compilationLock;
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 98a97b0..fc99792 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -24,12 +24,14 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
@@ -83,6 +85,8 @@
     @Override
     public void setCcAppCtx(ICcApplicationContext appCtx) {
         this.appCtx = appCtx;
+        //TODO we need to maintain the node2PartitionsMap in the same state before a failover and only change the
+        // a partition's current active node
         node2PartitionsMap = appCtx.getMetadataProperties().getNodePartitions();
         clusterPartitions = appCtx.getMetadataProperties().getClusterPartitions();
         currentMetadataNode = appCtx.getMetadataProperties().getMetadataNodeName();
@@ -139,20 +143,33 @@
     }
 
     @Override
-    public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters) {
+    public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters,
+            Set<Integer> nodePartitions) {
         if (active) {
             updateClusterCounters(nodeId, localCounters);
             participantNodes.add(nodeId);
         } else {
             participantNodes.remove(nodeId);
         }
-        ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
-        // if this isn't a storage node, it will not have cluster partitions
+        //        ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
         if (nodePartitions != null) {
-            for (ClusterPartition p : nodePartitions) {
-                updateClusterPartition(p.getPartitionId(), nodeId, active);
+            for (Integer partitionId : nodePartitions) {
+                updateClusterPartition(partitionId, nodeId, active);
+            }
+        } else {
+            List<ClusterPartition> nodeActivePartitions = clusterPartitions.values().stream()
+                    .filter(clusterPartition -> clusterPartition.getActiveNodeId().equals(nodeId))
+                    .collect(Collectors.toList());
+            for (ClusterPartition partition : nodeActivePartitions) {
+                updateClusterPartition(partition.getPartitionId(), nodeId, active);
             }
         }
+        // if this isn't a storage node, it will not have cluster partitions
+        //        if (nodePartitions != null) {
+        //            for (ClusterPartition p : nodePartitions) {
+        //                updateClusterPartition(p.getPartitionId(), nodeId, active);
+        //            }
+        //        }
     }
 
     @Override
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 29dedf7..556ab6a 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -248,7 +248,7 @@
         }
     }
 
-    private static FileReference getLocalResourceFileByName(IIOManager ioManager, String resourcePath)
+    public static FileReference getLocalResourceFileByName(IIOManager ioManager, String resourcePath)
             throws HyracksDataException {
         String fileName = resourcePath + File.separator + StorageConstants.METADATA_FILE_NAME;
         return ioManager.resolve(fileName);
@@ -603,4 +603,8 @@
     private static boolean isComponentFile(File indexDir, String fileName) {
         return COMPONENT_FILES_FILTER.accept(indexDir, fileName);
     }
+
+    public Path[] getStorageRoots() {
+        return storageRoots;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
index 0a0e026..c7be4ae 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
@@ -59,7 +59,7 @@
     @Override
     public FileReference getFileReference(IIOManager ioManager) throws HyracksDataException {
         if (cached == null) {
-            cached = new FileReference(ioManager.getIODevices().get(ioDevice), getPath());
+            cached = super.getFileReference(ioManager);
         }
         return cached;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index f54a62c..5166970 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -184,6 +184,9 @@
 
     public synchronized void failNode(String nodeId) throws HyracksException {
         NodeControllerState state = nodeRegistry.get(nodeId);
+        if (state == null) {
+            return;
+        }
         Set<JobId> affectedJobIds = state.getActiveJobIds();
         // Removes the node from node map.
         nodeRegistry.remove(nodeId);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12503
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I3b8de5d3354be36768e3c9da116c3c96a552a867
Gerrit-Change-Number: 12503
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Hubail <mh...@apache.org>
Gerrit-MessageType: newchange