You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu> on 2021/07/20 00:16:33 UTC

Change in asterixdb[master]: PLEASE EDIT to provide a meaningful commit message!

From Murtadha Hubail <mh...@apache.org>:

Murtadha Hubail has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12404 )


Change subject: PLEASE EDIT to provide a meaningful commit message!
......................................................................

PLEASE EDIT to provide a meaningful commit message!

The following commits from your working branch will be included:

commit 90c270d5aa339b2b8a53be2187d8e6f688e243a2
Author: Murtadha Hubail <mu...@couchbase.com>
Date:   Wed Jul 14 03:53:03 2021 +0300

    WIP: Allow node partitions to specified in config

Change-Id: I7997156936612bd068285710eacb5f6f8d0e0d3c
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
A asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
30 files changed, 298 insertions(+), 59 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/04/12404/1

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
index d7d5994..70b46e0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
@@ -31,6 +31,7 @@
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.replication.ReplicationStrategyFactory;
 import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.storage.ReplicaIdentifier;
 import org.apache.asterix.common.storage.ResourceStorageStats;
@@ -135,6 +136,8 @@
                 final ObjectNode replicaJson = OBJECT_MAPPER.createObjectNode();
                 replicaJson.put("location", toHostPort(replica.getIdentifier().getLocation()));
                 replicaJson.put("status", replica.getStatus().toString());
+                replicaJson.put("nodeId", replica.getIdentifier().getNodeId());
+                replicaJson.put("strategy", replica.getIdentifier().getStrategy().getName());
                 replicasArray.add(replicaJson);
             }
             partitionJson.set("replicas", replicasArray);
@@ -167,11 +170,14 @@
         final String partition = request.getParameter("partition");
         final String host = request.getParameter("host");
         final String port = request.getParameter("port");
-        if (partition == null || host == null || port == null) {
+        final String nodeId = request.getParameter("nodeId");
+        final String strategy = request.getParameter("strategy");
+        if (partition == null || host == null || port == null || strategy == null) {
             return null;
         }
-        final InetSocketAddress replicaAddress = new InetSocketAddress(host, Integer.valueOf(port));
-        return ReplicaIdentifier.of(Integer.valueOf(partition), replicaAddress);
+        final InetSocketAddress replicaAddress = new InetSocketAddress(host, Integer.parseInt(port));
+        return ReplicaIdentifier.of(Integer.parseInt(partition), nodeId, replicaAddress,
+                ReplicationStrategyFactory.create(strategy));
     }
 
     private void processPromote(IServletRequest request, IServletResponse response) throws HyracksDataException {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 9aa433f..53d0973 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -38,7 +38,6 @@
 import org.apache.asterix.common.api.IPropertiesFactory;
 import org.apache.asterix.common.api.IReceptionist;
 import org.apache.asterix.common.api.IReceptionistFactory;
-import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.ActiveProperties;
 import org.apache.asterix.common.config.BuildProperties;
 import org.apache.asterix.common.config.CompilerProperties;
@@ -225,9 +224,9 @@
                 new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
                         virtualBufferCache, indexCheckpointManagerProvider, ioManager.getIODevices().size());
         final String nodeId = getServiceContext().getNodeId();
-        final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
+        final String[] nodePartitions = nodeProperties.getActivePartitions();
         final Set<Integer> nodePartitionsIds =
-                Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet());
+                Arrays.stream(nodePartitions).map(Integer::parseInt).collect(Collectors.toSet());
         replicaManager = new ReplicaManager(this, nodePartitionsIds);
         isShuttingdown = false;
         activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index ad70cf4..48c0a36 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -30,11 +30,13 @@
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.replication.IPartitionReplica;
 import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.storage.ReplicaIdentifier;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.asterix.replication.messaging.ReplicaPromotionMessage;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.config.IApplicationConfig;
@@ -89,8 +91,10 @@
 
     @Override
     public synchronized void removeReplica(ReplicaIdentifier id) {
+        //TODO return illegal state?
         if (!replicas.containsKey(id)) {
-            throw new IllegalStateException("replica with id(" + id + ") does not exist");
+            return;
+            //            throw new IllegalStateException("replica with id(" + id + ") does not exist");
         }
         PartitionReplica replica = replicas.remove(id);
         appCtx.getReplicationManager().unregister(replica);
@@ -119,6 +123,17 @@
         final IRecoveryManager recoveryManager = appCtx.getTransactionSubsystem().getRecoveryManager();
         recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true);
         partitions.add(partition);
+        // TODO should we just persist the takeover success on metakv to prevent and race between CC being active
+        // and the partition being active?
+        ReplicaPromotionMessage message =
+                new ReplicaPromotionMessage(appCtx.getTransactionSubsystem().getId(), partition);
+        INCMessageBroker messageBroker = (INCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+        try {
+            messageBroker.sendMessageToPrimaryCC(message);
+        } catch (Exception e) {
+            //TODO should we persist partition takeover success in metakv?
+            LOGGER.error("failed to notify CC of replica promotion");
+        }
     }
 
     @Override
@@ -147,8 +162,9 @@
         final Map<Long, LocalResource> partitionResources = resourceRepository.getPartitionResources(partition);
         final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
         for (LocalResource resource : partitionResources.values()) {
-            datasetLifecycleManager.close(resource.getPath());
+            datasetLifecycleManager.closeIfOpen(resource.getPath());
         }
+        datasetLifecycleManager.closePartition(partition);
     }
 
     private boolean isSelf(ReplicaIdentifier id) {
@@ -157,6 +173,6 @@
         int port = appConfig.getInt(NCConfig.Option.REPLICATION_LISTEN_PORT);
 
         final InetSocketAddress replicaAddress = new InetSocketAddress(host, port);
-        return id.equals(ReplicaIdentifier.of(id.getPartition(), replicaAddress));
+        return id.equals(ReplicaIdentifier.of(id.getPartition(), id.getNodeId(), replicaAddress, id.getStrategy()));
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index c4e4f82..cacb15a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -26,14 +26,12 @@
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
 import org.apache.asterix.app.nc.task.CheckpointTask;
@@ -51,7 +49,6 @@
 import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -102,7 +99,8 @@
     @Override
     public void notifyNodeFailure(String nodeId, InetSocketAddress replicaAddress) throws HyracksDataException {
         pendingStartupCompletionNodes.remove(nodeId);
-        clusterManager.updateNodeState(nodeId, false, null);
+        //TODO on node failure, we need to set all of it's active partitions to inactive
+        clusterManager.updateNodeState(nodeId, false, null, null);
         if (nodeId.equals(metadataNodeId)) {
             clusterManager.updateMetadataNode(metadataNodeId, false);
         }
@@ -138,7 +136,8 @@
     private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
         final String nodeId = msg.getNodeId();
         nodeSecretsMap.put(nodeId, msg.getSecrets());
-        List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState());
+        List<INCLifecycleTask> tasks =
+                buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState(), msg.getActivePartitions());
         RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
         try {
             messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
@@ -157,7 +156,7 @@
             return;
         }
         if (msg.isSuccess()) {
-            clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters());
+            clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters(), msg.getPartitions());
             if (msg.getNodeId().equals(metadataNodeId)) {
                 clusterManager.updateMetadataNode(metadataNodeId, true);
             }
@@ -167,7 +166,8 @@
         }
     }
 
-    protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state) {
+    protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state,
+            Set<Integer> activePartitions) {
         LOGGER.info("Building registration tasks for node {} with status {} and system state: {}", nodeId, nodeStatus,
                 state);
         final boolean isMetadataNode = nodeId.equals(metadataNodeId);
@@ -175,7 +175,7 @@
             case ACTIVE:
                 return buildActiveNCRegTasks(isMetadataNode);
             case IDLE:
-                return buildIdleNcRegTasks(nodeId, isMetadataNode, state);
+                return buildIdleNcRegTasks(nodeId, isMetadataNode, state, activePartitions);
             default:
                 return new ArrayList<>();
         }
@@ -210,15 +210,18 @@
         }
     }
 
-    protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state) {
+    protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state,
+            Set<Integer> activePartitions) {
         final List<INCLifecycleTask> tasks = new ArrayList<>();
         tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING));
         if (state == SystemState.CORRUPTED) {
-            // need to perform local recovery for node partitions
-            LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.stream(clusterManager.getNodePartitions(newNodeId))
-                    .map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
+            // need to perform local recovery for node active partitions
+            //TODO ensure recovery is done for all active partitions before the CheckpointTask since that
+            // might delete any old transaction log files
+            LocalRecoveryTask rt = new LocalRecoveryTask(activePartitions);
             tasks.add(rt);
         }
+
         if (replicationEnabled) {
             tasks.add(new StartReplicationServiceTask());
         }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
index 79fa7c8..c70b6bc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.app.replication.message;
 
+import java.util.Set;
+
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
@@ -31,11 +33,14 @@
     private final boolean success;
     private Throwable exception;
     private final NcLocalCounters localCounters;
+    private Set<Integer> partitions;
 
-    public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters) {
+    public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters,
+            Set<Integer> partitions) {
         this.nodeId = nodeId;
         this.success = success;
         this.localCounters = localCounters;
+        this.partitions = partitions;
     }
 
     @Override
@@ -63,6 +68,10 @@
         return localCounters;
     }
 
+    public Set<Integer> getPartitions() {
+        return partitions;
+    }
+
     @Override
     public MessageType getType() {
         return MessageType.REGISTRATION_TASKS_RESULT;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
index c2cc63c..870413b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
@@ -20,6 +20,7 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
@@ -37,25 +38,27 @@
 public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICcAddressedMessage {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private static final long serialVersionUID = 2L;
+    private static final long serialVersionUID = 3L;
     protected final SystemState state;
     protected final String nodeId;
     protected final NodeStatus nodeStatus;
     protected final Map<String, Object> secrets;
+    protected final Set<Integer> activePartitions;
 
     public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state,
-            Map<String, Object> secretsEphemeral) {
+            Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) {
         this.state = state;
         this.nodeId = nodeId;
         this.nodeStatus = nodeStatus;
         this.secrets = new HashMap<>(secretsEphemeral);
+        this.activePartitions = activePartitions;
     }
 
     public static void send(CcId ccId, NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState,
-            Map<String, Object> secretsEphemeral) throws HyracksDataException {
+            Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) throws HyracksDataException {
         try {
-            RegistrationTasksRequestMessage msg =
-                    new RegistrationTasksRequestMessage(cs.getId(), nodeStatus, systemState, secretsEphemeral);
+            RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus,
+                    systemState, secretsEphemeral, activePartitions);
             ((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(ccId, msg);
         } catch (Exception e) {
             LOGGER.log(Level.ERROR, "Unable to send RegistrationTasksRequestMessage to CC", e);
@@ -88,4 +91,8 @@
     public Map<String, Object> getSecrets() {
         return secrets;
     }
+
+    public Set<Integer> getActivePartitions() {
+        return activePartitions;
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index 1227a6f..7848849 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.app.replication.message;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -72,7 +73,9 @@
             }
             NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(),
                     (NodeControllerService) appCtx.getServiceContext().getControllerService()) : null;
-            NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success, localCounter);
+            Set<Integer> partitions = appCtx.getReplicaManager().getPartitions();
+            NCLifecycleTaskReportMessage result =
+                    new NCLifecycleTaskReportMessage(nodeId, success, localCounter, partitions);
             result.setException(exception);
             try {
                 broker.sendMessageToCC(getCcId(), result);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 3571d88..76eb184 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -42,6 +42,7 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.active.ActivityState;
@@ -272,6 +273,7 @@
     protected final ICcApplicationContext appCtx;
     protected final SessionOutput sessionOutput;
     protected final SessionConfig sessionConfig;
+    protected final ReentrantReadWriteLock compilationLock;
     protected Dataverse activeDataverse;
     protected final List<FunctionDecl> declaredFunctions;
     protected final ILangCompilationProvider compilationProvider;
@@ -290,6 +292,7 @@
         this.appCtx = appCtx;
         this.lockManager = appCtx.getMetadataLockManager();
         this.lockUtil = appCtx.getMetadataLockUtil();
+        this.compilationLock = appCtx.getCompilationLock();
         this.statements = statements;
         this.sessionOutput = output;
         this.sessionConfig = output.config();
@@ -3367,12 +3370,14 @@
         final IMetadataLocker locker = new IMetadataLocker() {
             @Override
             public void lock() throws AlgebricksException {
+                compilationLock.readLock().lock();
                 lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
             }
 
             @Override
             public void unlock() {
                 metadataProvider.getLocks().unlock();
+                compilationLock.readLock().unlock();
             }
         };
         final IStatementCompiler compiler = () -> {
@@ -3962,6 +3967,7 @@
         final IMetadataLocker locker = new IMetadataLocker() {
             @Override
             public void lock() {
+                compilationLock.readLock().lock();
             }
 
             @Override
@@ -3969,6 +3975,7 @@
                 metadataProvider.getLocks().unlock();
                 // release external datasets' locks acquired during compilation of the query
                 ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
+                compilationLock.readLock().unlock();
             }
         };
         final IStatementCompiler compiler = () -> {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index c148c92..89f7741 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -36,6 +36,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.api.http.server.BasicAuthServlet;
@@ -299,8 +300,10 @@
         final Map httpSecrets =
                 apiServer != null ? Collections.singletonMap(SYS_AUTH_HEADER, apiServer.ctx().get(SYS_AUTH_HEADER))
                         : Collections.emptyMap();
+        Set<Integer> activePartitions = Arrays.stream(runtimeContext.getNodeProperties().getActivePartitions())
+                .map(Integer::parseInt).collect(Collectors.toSet());
         RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
-                currentStatus, systemState, httpSecrets);
+                currentStatus, systemState, httpSecrets, activePartitions);
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index 9cc295e..370b330 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -200,7 +200,8 @@
 
     private void notifyNodeStartupCompletion(CcApplicationContext applicationContext, String nodeId)
             throws HyracksDataException {
-        NCLifecycleTaskReportMessage msg = new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters());
+        NCLifecycleTaskReportMessage msg =
+                new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters(), Collections.emptySet());
         applicationContext.getNcLifecycleCoordinator().process(msg);
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index b03af55..48bada3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -169,4 +169,8 @@
      * @return the current datasets io stats
      */
     StorageIOStats getDatasetsIOStats();
+
+    void closeIfOpen(String resourcePath) throws HyracksDataException;
+
+    void closePartition(int partitionId);
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
index cc99421..b98cbe5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
@@ -71,7 +71,10 @@
 
     @Override
     public ClusterPartition clone() {
-        return new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+        ClusterPartition clone = new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+        clone.setActiveNodeId(activeNodeId);
+        clone.setActive(active);
+        return clone;
     }
 
     @Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index a37e6e4..94f80cc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -62,9 +62,11 @@
      * @param nodeId
      * @param active
      * @param ncLocalCounters
+     * @param partitions
      * @throws HyracksDataException
      */
-    void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters) throws HyracksDataException;
+    void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters, Set<Integer> partitions)
+            throws HyracksDataException;
 
     /**
      * Updates the active node and active state of the cluster partition with id {@code partitionNum}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
index aaf6316..efc9b6a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
@@ -46,7 +46,11 @@
         STARTING_PARTITION_ID(
                 OptionTypes.INTEGER,
                 -1,
-                "The first partition id to assign to iodevices on this node (-1 == auto-assign)");
+                "The first partition id to assign to iodevices on this node (-1 == auto-assign)"),
+        ACTIVE_PARTITIONS(
+                OptionTypes.STRING_ARRAY,
+                new String[] {},
+                "List of partitions this node is currently the master of");
 
         private final IOptionType type;
         private final Object defaultValue;
@@ -95,7 +99,7 @@
 
         @Override
         public boolean hidden() {
-            return this == INITIAL_RUN || this == STARTING_PARTITION_ID;
+            return this == INITIAL_RUN || this == STARTING_PARTITION_ID || this == ACTIVE_PARTITIONS;
         }
 
     }
@@ -115,4 +119,8 @@
     public String getTxnLogDir() {
         return accessor.getString(Option.TXN_LOG_DIR);
     }
+
+    public String[] getActivePartitions() {
+        return accessor.getStringArray(Option.ACTIVE_PARTITIONS);
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
index 3b6100c..5d1a8e4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
@@ -132,6 +132,7 @@
         }
         stores.put(ncId, nodeStores);
         nodePartitionsMap.put(ncId, nodePartitions);
+        LOGGER.info("Node partitions {}", nodeCfg.getStringArray(NodeProperties.Option.ACTIVE_PARTITIONS));
     }
 
     private void loadAsterixBuildProperties() throws AsterixException {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index b2f4034..00a30e5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -559,6 +559,51 @@
         return stats;
     }
 
+    @Override
+    public synchronized void closeIfOpen(String resourcePath) throws HyracksDataException {
+        validateDatasetLifecycleManagerState();
+        int did = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
+
+        DatasetResource dsr = datasets.get(did);
+        IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
+
+        if (dsr == null || iInfo == null) {
+            return;
+        }
+
+        PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
+        if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
+            if (LOGGER.isErrorEnabled()) {
+                final String logMsg = String.format(
+                        "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
+                        resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
+                LOGGER.error(logMsg);
+            }
+            throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
+                    StoragePathUtil.getIndexNameFromPath(resourcePath));
+        }
+
+        // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
+        DatasetInfo dsInfo = dsr.getDatasetInfo();
+        dsInfo.waitForIO();
+        closeIndex(iInfo);
+        dsInfo.removeIndex(resourceID);
+        synchronized (dsInfo) {
+            if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
+                    && !dsInfo.isExternal()) {
+                removeDatasetFromCache(dsInfo.getDatasetID());
+            }
+        }
+    }
+
+    @Override
+    public synchronized void closePartition(int partitionId) {
+        for (DatasetResource ds : datasets.values()) {
+            ds.removePartition(partitionId);
+        }
+    }
+
     private void closeIndex(IndexInfo indexInfo) throws HyracksDataException {
         if (indexInfo.isOpen()) {
             ILSMOperationTracker opTracker = indexInfo.getIndex().getOperationTracker();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 54e1976..db9eabb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -182,4 +182,10 @@
     public boolean isMetadataDataset() {
         return MetadataIndexImmutableProperties.isMetadataDataset(getDatasetID());
     }
+
+    public void removePartition(int partitionId) {
+        datasetPrimaryOpTrackers.remove(partitionId);
+        datasetComponentIdGenerators.remove(partitionId);
+        datasetRateLimiters.remove(partitionId);
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index ad90814..72fb32c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.common.dataflow;
 
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.ICoordinationService;
 import org.apache.asterix.common.api.IMetadataLockManager;
@@ -149,4 +151,11 @@
      * @return the adapter factory service
      */
     IAdapterFactoryService getAdapterFactoryService();
+
+    /**
+     * Gets the compilation lock
+     *
+     * @return the compilation lock
+     */
+    ReentrantReadWriteLock getCompilationLock();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
index c969777..1dfc2e2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
@@ -24,4 +24,9 @@
     public boolean isMatch(int datasetId) {
         return true;
     }
+
+    @Override
+    public String getName() {
+        return "all";
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
index 0ad8be2..a74b621 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
@@ -25,4 +25,6 @@
      * @return True, if the dataset should be replicated. Otherwise false.
      */
     boolean isMatch(int datasetId);
+
+    String getName();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
index 2b97fe7..5278208 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
@@ -26,4 +26,9 @@
     public boolean isMatch(int datasetId) {
         return MetadataIndexImmutableProperties.isMetadataDataset(datasetId);
     }
+
+    @Override
+    public String getName() {
+        return "metadata";
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
index 8ffaa15..49083c8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
@@ -24,4 +24,9 @@
     public boolean isMatch(int datasetId) {
         return false;
     }
+
+    @Override
+    public String getName() {
+        return "none";
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
index c4bb74c..af285be 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
@@ -20,22 +20,28 @@
 
 import java.net.InetSocketAddress;
 
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.hyracks.util.NetworkUtil;
 
 public class ReplicaIdentifier {
 
     private final int partition;
     private final String id;
+    private final IReplicationStrategy strategy;
     private volatile InetSocketAddress location;
+    private final String nodeId;
 
-    private ReplicaIdentifier(int partition, InetSocketAddress location) {
+    private ReplicaIdentifier(int partition, String nodeId, InetSocketAddress location, IReplicationStrategy strategy) {
         this.partition = partition;
         this.location = location;
-        id = partition + "@" + location.getHostString() + ":" + location.getPort();
+        this.nodeId = nodeId;
+        this.strategy = strategy;
+        id = partition + "@" + location.getHostString() + ":" + location.getPort() + ":" + strategy.getName();
     }
 
-    public static ReplicaIdentifier of(int partition, InetSocketAddress location) {
-        return new ReplicaIdentifier(partition, location);
+    public static ReplicaIdentifier of(int partition, String nodeId, InetSocketAddress location,
+            IReplicationStrategy strategy) {
+        return new ReplicaIdentifier(partition, nodeId, location, strategy);
     }
 
     public int getPartition() {
@@ -52,6 +58,14 @@
         return location;
     }
 
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public IReplicationStrategy getStrategy() {
+        return strategy;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
index dd953c4..9bf0337 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
@@ -101,7 +101,8 @@
 
     private void process(IReplicationJob job) {
         try {
-            if (skip(job)) {
+            DatasetResourceReference resourceRef = getJobDatasetResource(job);
+            if (skip(resourceRef)) {
                 return;
             }
             synchronized (transferLock) {
@@ -109,15 +110,17 @@
                     return;
                 }
                 final IndexSynchronizer synchronizer = new IndexSynchronizer(job, appCtx);
-                final int indexPartition = getJobPartition(job);
+                final int indexPartition = resourceRef.getPartitionNum();
                 for (ReplicationDestination dest : destinations) {
                     try {
                         Optional<IPartitionReplica> partitionReplica = dest.getPartitionReplica(indexPartition);
-                        if (!partitionReplica.isPresent()) {
+                        if (partitionReplica.isEmpty()) {
                             continue;
                         }
                         PartitionReplica replica = (PartitionReplica) partitionReplica.get();
-                        synchronizer.sync(replica);
+                        if (replica.getIdentifier().getStrategy().isMatch(resourceRef.getDatasetId())) {
+                            synchronizer.sync(replica);
+                        }
                     } catch (Exception e) {
                         handleFailure(dest, e);
                     }
@@ -129,23 +132,20 @@
         }
     }
 
-    private boolean skip(IReplicationJob job) {
+    private boolean skip(DatasetResourceReference indexFileRef) {
+        return indexFileRef == null || !replicationStrategy.isMatch(indexFileRef.getDatasetId());
+    }
+
+    private DatasetResourceReference getJobDatasetResource(IReplicationJob job) {
         try {
-            final String fileToReplicate = job.getAnyFile();
-            final Optional<DatasetResourceReference> indexFileRefOpt =
-                    resourceRepository.getLocalResourceReference(fileToReplicate);
-            if (!indexFileRefOpt.isPresent()) {
-                LOGGER.warn("skipping replication of {} due to missing dataset resource reference", fileToReplicate);
-                return true;
-            }
-            return !replicationStrategy.isMatch(indexFileRefOpt.get().getDatasetId());
+            return resourceRepository.getLocalResourceReference(job.getAnyFile()).orElse(null);
         } catch (HyracksDataException e) {
             throw new IllegalStateException("Couldn't find resource for " + job.getAnyFile(), e);
         }
     }
 
-    private int getJobPartition(IReplicationJob job) {
-        return ResourceReference.of(job.getAnyFile()).getPartitionNum();
+    private ResourceReference getJobPartition(IReplicationJob job) {
+        return ResourceReference.of(job.getAnyFile());
     }
 
     private void closeChannels() {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
index 6a23ae6..a805d82 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
@@ -128,6 +128,7 @@
     public void replicate(ILogRecord logRecord) throws InterruptedException {
         if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
             synchronized (destinations) {
+                //TODO filter destinations
                 ackTracker.track(logRecord, new HashSet<>(destinations.keySet()));
             }
         }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
new file mode 100644
index 0000000..e73bcb2
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ReplicaPromotionMessage implements ICcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+    private int partition;
+
+    public ReplicaPromotionMessage(String nodeId, int partition) {
+        this.nodeId = nodeId;
+        this.partition = partition;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        LOGGER.info("Partition {} master changed, new master {}", partition, nodeId);
+        IClusterStateManager clusterStateManager = appCtx.getClusterStateManager();
+        clusterStateManager.updateClusterPartition(partition, nodeId, true);
+        clusterStateManager.refreshState();
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 9099c88..2576c4f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.runtime.utils;
 
 import java.io.IOException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
 
 import org.apache.asterix.common.api.IConfigValidator;
@@ -101,6 +102,8 @@
     private final IConfigValidator configValidator;
     private final IAdapterFactoryService adapterFactoryService;
 
+    private final ReentrantReadWriteLock compilationLock = new ReentrantReadWriteLock(true);
+
     public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
             Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
             INcLifecycleCoordinator ftStrategy, IJobLifecycleListener activeLifeCycleListener,
@@ -314,4 +317,9 @@
     public IAdapterFactoryService getAdapterFactoryService() {
         return adapterFactoryService;
     }
+
+    @Override
+    public ReentrantReadWriteLock getCompilationLock() {
+        return compilationLock;
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 98a97b0..fc99792 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -24,12 +24,14 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
@@ -83,6 +85,8 @@
     @Override
     public void setCcAppCtx(ICcApplicationContext appCtx) {
         this.appCtx = appCtx;
+        //TODO we need to maintain the node2PartitionsMap in the same state before a failover and only change the
+        // a partition's current active node
         node2PartitionsMap = appCtx.getMetadataProperties().getNodePartitions();
         clusterPartitions = appCtx.getMetadataProperties().getClusterPartitions();
         currentMetadataNode = appCtx.getMetadataProperties().getMetadataNodeName();
@@ -139,20 +143,33 @@
     }
 
     @Override
-    public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters) {
+    public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters,
+            Set<Integer> nodePartitions) {
         if (active) {
             updateClusterCounters(nodeId, localCounters);
             participantNodes.add(nodeId);
         } else {
             participantNodes.remove(nodeId);
         }
-        ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
-        // if this isn't a storage node, it will not have cluster partitions
+        //        ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
         if (nodePartitions != null) {
-            for (ClusterPartition p : nodePartitions) {
-                updateClusterPartition(p.getPartitionId(), nodeId, active);
+            for (Integer partitionId : nodePartitions) {
+                updateClusterPartition(partitionId, nodeId, active);
+            }
+        } else {
+            List<ClusterPartition> nodeActivePartitions = clusterPartitions.values().stream()
+                    .filter(clusterPartition -> clusterPartition.getActiveNodeId().equals(nodeId))
+                    .collect(Collectors.toList());
+            for (ClusterPartition partition : nodeActivePartitions) {
+                updateClusterPartition(partition.getPartitionId(), nodeId, active);
             }
         }
+        // if this isn't a storage node, it will not have cluster partitions
+        //        if (nodePartitions != null) {
+        //            for (ClusterPartition p : nodePartitions) {
+        //                updateClusterPartition(p.getPartitionId(), nodeId, active);
+        //            }
+        //        }
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
index 0a0e026..c7be4ae 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
@@ -59,7 +59,7 @@
     @Override
     public FileReference getFileReference(IIOManager ioManager) throws HyracksDataException {
         if (cached == null) {
-            cached = new FileReference(ioManager.getIODevices().get(ioDevice), getPath());
+            cached = super.getFileReference(ioManager);
         }
         return cached;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index f54a62c..5166970 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -184,6 +184,9 @@
 
     public synchronized void failNode(String nodeId) throws HyracksException {
         NodeControllerState state = nodeRegistry.get(nodeId);
+        if (state == null) {
+            return;
+        }
         Set<JobId> affectedJobIds = state.getActiveJobIds();
         // Removes the node from node map.
         nodeRegistry.remove(nodeId);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12404
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I7997156936612bd068285710eacb5f6f8d0e0d3c
Gerrit-Change-Number: 12404
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Hubail <mh...@apache.org>
Gerrit-MessageType: newchange

Change in asterixdb[master]: Dynamic Partitions Topology PoC

Posted by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu>.
From Murtadha Hubail <mh...@apache.org>:

Murtadha Hubail has abandoned this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12404 )

Change subject: Dynamic Partitions Topology PoC
......................................................................


Abandoned
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12404
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I7997156936612bd068285710eacb5f6f8d0e0d3c
Gerrit-Change-Number: 12404
Gerrit-PatchSet: 3
Gerrit-Owner: Murtadha Hubail <mh...@apache.org>
Gerrit-Reviewer: Ali Alsuliman <al...@gmail.com>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Murtadha Hubail <mh...@apache.org>
Gerrit-MessageType: abandon

Change in asterixdb[master]: PLEASE EDIT to provide a meaningful commit message!

Posted by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu>.
From Murtadha Hubail <mh...@apache.org>:

Murtadha Hubail has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12404 )


Change subject: PLEASE EDIT to provide a meaningful commit message!
......................................................................

PLEASE EDIT to provide a meaningful commit message!

The following commits from your working branch will be included:

commit 90c270d5aa339b2b8a53be2187d8e6f688e243a2
Author: Murtadha Hubail <mu...@couchbase.com>
Date:   Wed Jul 14 03:53:03 2021 +0300

    WIP: Allow node partitions to specified in config

Change-Id: I7997156936612bd068285710eacb5f6f8d0e0d3c
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
A asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
30 files changed, 298 insertions(+), 59 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/04/12404/1

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
index d7d5994..70b46e0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
@@ -31,6 +31,7 @@
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.replication.ReplicationStrategyFactory;
 import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.storage.ReplicaIdentifier;
 import org.apache.asterix.common.storage.ResourceStorageStats;
@@ -135,6 +136,8 @@
                 final ObjectNode replicaJson = OBJECT_MAPPER.createObjectNode();
                 replicaJson.put("location", toHostPort(replica.getIdentifier().getLocation()));
                 replicaJson.put("status", replica.getStatus().toString());
+                replicaJson.put("nodeId", replica.getIdentifier().getNodeId());
+                replicaJson.put("strategy", replica.getIdentifier().getStrategy().getName());
                 replicasArray.add(replicaJson);
             }
             partitionJson.set("replicas", replicasArray);
@@ -167,11 +170,14 @@
         final String partition = request.getParameter("partition");
         final String host = request.getParameter("host");
         final String port = request.getParameter("port");
-        if (partition == null || host == null || port == null) {
+        final String nodeId = request.getParameter("nodeId");
+        final String strategy = request.getParameter("strategy");
+        if (partition == null || host == null || port == null || strategy == null) {
             return null;
         }
-        final InetSocketAddress replicaAddress = new InetSocketAddress(host, Integer.valueOf(port));
-        return ReplicaIdentifier.of(Integer.valueOf(partition), replicaAddress);
+        final InetSocketAddress replicaAddress = new InetSocketAddress(host, Integer.parseInt(port));
+        return ReplicaIdentifier.of(Integer.parseInt(partition), nodeId, replicaAddress,
+                ReplicationStrategyFactory.create(strategy));
     }
 
     private void processPromote(IServletRequest request, IServletResponse response) throws HyracksDataException {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 9aa433f..53d0973 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -38,7 +38,6 @@
 import org.apache.asterix.common.api.IPropertiesFactory;
 import org.apache.asterix.common.api.IReceptionist;
 import org.apache.asterix.common.api.IReceptionistFactory;
-import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.ActiveProperties;
 import org.apache.asterix.common.config.BuildProperties;
 import org.apache.asterix.common.config.CompilerProperties;
@@ -225,9 +224,9 @@
                 new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
                         virtualBufferCache, indexCheckpointManagerProvider, ioManager.getIODevices().size());
         final String nodeId = getServiceContext().getNodeId();
-        final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
+        final String[] nodePartitions = nodeProperties.getActivePartitions();
         final Set<Integer> nodePartitionsIds =
-                Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet());
+                Arrays.stream(nodePartitions).map(Integer::parseInt).collect(Collectors.toSet());
         replicaManager = new ReplicaManager(this, nodePartitionsIds);
         isShuttingdown = false;
         activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index ad70cf4..48c0a36 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -30,11 +30,13 @@
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.replication.IPartitionReplica;
 import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.storage.ReplicaIdentifier;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.asterix.replication.messaging.ReplicaPromotionMessage;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.config.IApplicationConfig;
@@ -89,8 +91,10 @@
 
     @Override
     public synchronized void removeReplica(ReplicaIdentifier id) {
+        //TODO return illegal state?
         if (!replicas.containsKey(id)) {
-            throw new IllegalStateException("replica with id(" + id + ") does not exist");
+            return;
+            //            throw new IllegalStateException("replica with id(" + id + ") does not exist");
         }
         PartitionReplica replica = replicas.remove(id);
         appCtx.getReplicationManager().unregister(replica);
@@ -119,6 +123,17 @@
         final IRecoveryManager recoveryManager = appCtx.getTransactionSubsystem().getRecoveryManager();
         recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true);
         partitions.add(partition);
+        // TODO should we just persist the takeover success on metakv to prevent and race between CC being active
+        // and the partition being active?
+        ReplicaPromotionMessage message =
+                new ReplicaPromotionMessage(appCtx.getTransactionSubsystem().getId(), partition);
+        INCMessageBroker messageBroker = (INCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+        try {
+            messageBroker.sendMessageToPrimaryCC(message);
+        } catch (Exception e) {
+            //TODO should we persist partition takeover success in metakv?
+            LOGGER.error("failed to notify CC of replica promotion");
+        }
     }
 
     @Override
@@ -147,8 +162,9 @@
         final Map<Long, LocalResource> partitionResources = resourceRepository.getPartitionResources(partition);
         final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
         for (LocalResource resource : partitionResources.values()) {
-            datasetLifecycleManager.close(resource.getPath());
+            datasetLifecycleManager.closeIfOpen(resource.getPath());
         }
+        datasetLifecycleManager.closePartition(partition);
     }
 
     private boolean isSelf(ReplicaIdentifier id) {
@@ -157,6 +173,6 @@
         int port = appConfig.getInt(NCConfig.Option.REPLICATION_LISTEN_PORT);
 
         final InetSocketAddress replicaAddress = new InetSocketAddress(host, port);
-        return id.equals(ReplicaIdentifier.of(id.getPartition(), replicaAddress));
+        return id.equals(ReplicaIdentifier.of(id.getPartition(), id.getNodeId(), replicaAddress, id.getStrategy()));
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index c4e4f82..cacb15a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -26,14 +26,12 @@
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
 import org.apache.asterix.app.nc.task.CheckpointTask;
@@ -51,7 +49,6 @@
 import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -102,7 +99,8 @@
     @Override
     public void notifyNodeFailure(String nodeId, InetSocketAddress replicaAddress) throws HyracksDataException {
         pendingStartupCompletionNodes.remove(nodeId);
-        clusterManager.updateNodeState(nodeId, false, null);
+        //TODO on node failure, we need to set all of it's active partitions to inactive
+        clusterManager.updateNodeState(nodeId, false, null, null);
         if (nodeId.equals(metadataNodeId)) {
             clusterManager.updateMetadataNode(metadataNodeId, false);
         }
@@ -138,7 +136,8 @@
     private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
         final String nodeId = msg.getNodeId();
         nodeSecretsMap.put(nodeId, msg.getSecrets());
-        List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState());
+        List<INCLifecycleTask> tasks =
+                buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState(), msg.getActivePartitions());
         RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
         try {
             messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
@@ -157,7 +156,7 @@
             return;
         }
         if (msg.isSuccess()) {
-            clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters());
+            clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters(), msg.getPartitions());
             if (msg.getNodeId().equals(metadataNodeId)) {
                 clusterManager.updateMetadataNode(metadataNodeId, true);
             }
@@ -167,7 +166,8 @@
         }
     }
 
-    protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state) {
+    protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state,
+            Set<Integer> activePartitions) {
         LOGGER.info("Building registration tasks for node {} with status {} and system state: {}", nodeId, nodeStatus,
                 state);
         final boolean isMetadataNode = nodeId.equals(metadataNodeId);
@@ -175,7 +175,7 @@
             case ACTIVE:
                 return buildActiveNCRegTasks(isMetadataNode);
             case IDLE:
-                return buildIdleNcRegTasks(nodeId, isMetadataNode, state);
+                return buildIdleNcRegTasks(nodeId, isMetadataNode, state, activePartitions);
             default:
                 return new ArrayList<>();
         }
@@ -210,15 +210,18 @@
         }
     }
 
-    protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state) {
+    protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state,
+            Set<Integer> activePartitions) {
         final List<INCLifecycleTask> tasks = new ArrayList<>();
         tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING));
         if (state == SystemState.CORRUPTED) {
-            // need to perform local recovery for node partitions
-            LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.stream(clusterManager.getNodePartitions(newNodeId))
-                    .map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
+            // need to perform local recovery for node active partitions
+            //TODO ensure recovery is done for all active partitions before the CheckpointTask since that
+            // might delete any old transaction log files
+            LocalRecoveryTask rt = new LocalRecoveryTask(activePartitions);
             tasks.add(rt);
         }
+
         if (replicationEnabled) {
             tasks.add(new StartReplicationServiceTask());
         }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
index 79fa7c8..c70b6bc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.app.replication.message;
 
+import java.util.Set;
+
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
@@ -31,11 +33,14 @@
     private final boolean success;
     private Throwable exception;
     private final NcLocalCounters localCounters;
+    private Set<Integer> partitions;
 
-    public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters) {
+    public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters,
+            Set<Integer> partitions) {
         this.nodeId = nodeId;
         this.success = success;
         this.localCounters = localCounters;
+        this.partitions = partitions;
     }
 
     @Override
@@ -63,6 +68,10 @@
         return localCounters;
     }
 
+    public Set<Integer> getPartitions() {
+        return partitions;
+    }
+
     @Override
     public MessageType getType() {
         return MessageType.REGISTRATION_TASKS_RESULT;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
index c2cc63c..870413b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
@@ -20,6 +20,7 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
@@ -37,25 +38,27 @@
 public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICcAddressedMessage {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private static final long serialVersionUID = 2L;
+    private static final long serialVersionUID = 3L;
     protected final SystemState state;
     protected final String nodeId;
     protected final NodeStatus nodeStatus;
     protected final Map<String, Object> secrets;
+    protected final Set<Integer> activePartitions;
 
     public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state,
-            Map<String, Object> secretsEphemeral) {
+            Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) {
         this.state = state;
         this.nodeId = nodeId;
         this.nodeStatus = nodeStatus;
         this.secrets = new HashMap<>(secretsEphemeral);
+        this.activePartitions = activePartitions;
     }
 
     public static void send(CcId ccId, NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState,
-            Map<String, Object> secretsEphemeral) throws HyracksDataException {
+            Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) throws HyracksDataException {
         try {
-            RegistrationTasksRequestMessage msg =
-                    new RegistrationTasksRequestMessage(cs.getId(), nodeStatus, systemState, secretsEphemeral);
+            RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus,
+                    systemState, secretsEphemeral, activePartitions);
             ((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(ccId, msg);
         } catch (Exception e) {
             LOGGER.log(Level.ERROR, "Unable to send RegistrationTasksRequestMessage to CC", e);
@@ -88,4 +91,8 @@
     public Map<String, Object> getSecrets() {
         return secrets;
     }
+
+    public Set<Integer> getActivePartitions() {
+        return activePartitions;
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index 1227a6f..7848849 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.app.replication.message;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -72,7 +73,9 @@
             }
             NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(),
                     (NodeControllerService) appCtx.getServiceContext().getControllerService()) : null;
-            NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success, localCounter);
+            Set<Integer> partitions = appCtx.getReplicaManager().getPartitions();
+            NCLifecycleTaskReportMessage result =
+                    new NCLifecycleTaskReportMessage(nodeId, success, localCounter, partitions);
             result.setException(exception);
             try {
                 broker.sendMessageToCC(getCcId(), result);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 3571d88..76eb184 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -42,6 +42,7 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.active.ActivityState;
@@ -272,6 +273,7 @@
     protected final ICcApplicationContext appCtx;
     protected final SessionOutput sessionOutput;
     protected final SessionConfig sessionConfig;
+    protected final ReentrantReadWriteLock compilationLock;
     protected Dataverse activeDataverse;
     protected final List<FunctionDecl> declaredFunctions;
     protected final ILangCompilationProvider compilationProvider;
@@ -290,6 +292,7 @@
         this.appCtx = appCtx;
         this.lockManager = appCtx.getMetadataLockManager();
         this.lockUtil = appCtx.getMetadataLockUtil();
+        this.compilationLock = appCtx.getCompilationLock();
         this.statements = statements;
         this.sessionOutput = output;
         this.sessionConfig = output.config();
@@ -3367,12 +3370,14 @@
         final IMetadataLocker locker = new IMetadataLocker() {
             @Override
             public void lock() throws AlgebricksException {
+                compilationLock.readLock().lock();
                 lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
             }
 
             @Override
             public void unlock() {
                 metadataProvider.getLocks().unlock();
+                compilationLock.readLock().unlock();
             }
         };
         final IStatementCompiler compiler = () -> {
@@ -3962,6 +3967,7 @@
         final IMetadataLocker locker = new IMetadataLocker() {
             @Override
             public void lock() {
+                compilationLock.readLock().lock();
             }
 
             @Override
@@ -3969,6 +3975,7 @@
                 metadataProvider.getLocks().unlock();
                 // release external datasets' locks acquired during compilation of the query
                 ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
+                compilationLock.readLock().unlock();
             }
         };
         final IStatementCompiler compiler = () -> {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index c148c92..89f7741 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -36,6 +36,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.api.http.server.BasicAuthServlet;
@@ -299,8 +300,10 @@
         final Map httpSecrets =
                 apiServer != null ? Collections.singletonMap(SYS_AUTH_HEADER, apiServer.ctx().get(SYS_AUTH_HEADER))
                         : Collections.emptyMap();
+        Set<Integer> activePartitions = Arrays.stream(runtimeContext.getNodeProperties().getActivePartitions())
+                .map(Integer::parseInt).collect(Collectors.toSet());
         RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
-                currentStatus, systemState, httpSecrets);
+                currentStatus, systemState, httpSecrets, activePartitions);
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index 9cc295e..370b330 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -200,7 +200,8 @@
 
     private void notifyNodeStartupCompletion(CcApplicationContext applicationContext, String nodeId)
             throws HyracksDataException {
-        NCLifecycleTaskReportMessage msg = new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters());
+        NCLifecycleTaskReportMessage msg =
+                new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters(), Collections.emptySet());
         applicationContext.getNcLifecycleCoordinator().process(msg);
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index b03af55..48bada3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -169,4 +169,8 @@
      * @return the current datasets io stats
      */
     StorageIOStats getDatasetsIOStats();
+
+    void closeIfOpen(String resourcePath) throws HyracksDataException;
+
+    void closePartition(int partitionId);
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
index cc99421..b98cbe5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
@@ -71,7 +71,10 @@
 
     @Override
     public ClusterPartition clone() {
-        return new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+        ClusterPartition clone = new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+        clone.setActiveNodeId(activeNodeId);
+        clone.setActive(active);
+        return clone;
     }
 
     @Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index a37e6e4..94f80cc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -62,9 +62,11 @@
      * @param nodeId
      * @param active
      * @param ncLocalCounters
+     * @param partitions
      * @throws HyracksDataException
      */
-    void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters) throws HyracksDataException;
+    void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters, Set<Integer> partitions)
+            throws HyracksDataException;
 
     /**
      * Updates the active node and active state of the cluster partition with id {@code partitionNum}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
index aaf6316..efc9b6a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
@@ -46,7 +46,11 @@
         STARTING_PARTITION_ID(
                 OptionTypes.INTEGER,
                 -1,
-                "The first partition id to assign to iodevices on this node (-1 == auto-assign)");
+                "The first partition id to assign to iodevices on this node (-1 == auto-assign)"),
+        ACTIVE_PARTITIONS(
+                OptionTypes.STRING_ARRAY,
+                new String[] {},
+                "List of partitions this node is currently the master of");
 
         private final IOptionType type;
         private final Object defaultValue;
@@ -95,7 +99,7 @@
 
         @Override
         public boolean hidden() {
-            return this == INITIAL_RUN || this == STARTING_PARTITION_ID;
+            return this == INITIAL_RUN || this == STARTING_PARTITION_ID || this == ACTIVE_PARTITIONS;
         }
 
     }
@@ -115,4 +119,8 @@
     public String getTxnLogDir() {
         return accessor.getString(Option.TXN_LOG_DIR);
     }
+
+    public String[] getActivePartitions() {
+        return accessor.getStringArray(Option.ACTIVE_PARTITIONS);
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
index 3b6100c..5d1a8e4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
@@ -132,6 +132,7 @@
         }
         stores.put(ncId, nodeStores);
         nodePartitionsMap.put(ncId, nodePartitions);
+        LOGGER.info("Node partitions {}", nodeCfg.getStringArray(NodeProperties.Option.ACTIVE_PARTITIONS));
     }
 
     private void loadAsterixBuildProperties() throws AsterixException {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index b2f4034..00a30e5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -559,6 +559,51 @@
         return stats;
     }
 
+    @Override
+    public synchronized void closeIfOpen(String resourcePath) throws HyracksDataException {
+        validateDatasetLifecycleManagerState();
+        int did = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
+
+        DatasetResource dsr = datasets.get(did);
+        IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
+
+        if (dsr == null || iInfo == null) {
+            return;
+        }
+
+        PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
+        if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
+            if (LOGGER.isErrorEnabled()) {
+                final String logMsg = String.format(
+                        "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
+                        resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
+                LOGGER.error(logMsg);
+            }
+            throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
+                    StoragePathUtil.getIndexNameFromPath(resourcePath));
+        }
+
+        // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
+        DatasetInfo dsInfo = dsr.getDatasetInfo();
+        dsInfo.waitForIO();
+        closeIndex(iInfo);
+        dsInfo.removeIndex(resourceID);
+        synchronized (dsInfo) {
+            if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
+                    && !dsInfo.isExternal()) {
+                removeDatasetFromCache(dsInfo.getDatasetID());
+            }
+        }
+    }
+
+    @Override
+    public synchronized void closePartition(int partitionId) {
+        for (DatasetResource ds : datasets.values()) {
+            ds.removePartition(partitionId);
+        }
+    }
+
     private void closeIndex(IndexInfo indexInfo) throws HyracksDataException {
         if (indexInfo.isOpen()) {
             ILSMOperationTracker opTracker = indexInfo.getIndex().getOperationTracker();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 54e1976..db9eabb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -182,4 +182,10 @@
     public boolean isMetadataDataset() {
         return MetadataIndexImmutableProperties.isMetadataDataset(getDatasetID());
     }
+
+    public void removePartition(int partitionId) {
+        datasetPrimaryOpTrackers.remove(partitionId);
+        datasetComponentIdGenerators.remove(partitionId);
+        datasetRateLimiters.remove(partitionId);
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index ad90814..72fb32c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.common.dataflow;
 
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.ICoordinationService;
 import org.apache.asterix.common.api.IMetadataLockManager;
@@ -149,4 +151,11 @@
      * @return the adapter factory service
      */
     IAdapterFactoryService getAdapterFactoryService();
+
+    /**
+     * Gets the compilation lock
+     *
+     * @return the compilation lock
+     */
+    ReentrantReadWriteLock getCompilationLock();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
index c969777..1dfc2e2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
@@ -24,4 +24,9 @@
     public boolean isMatch(int datasetId) {
         return true;
     }
+
+    @Override
+    public String getName() {
+        return "all";
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
index 0ad8be2..a74b621 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
@@ -25,4 +25,6 @@
      * @return True, if the dataset should be replicated. Otherwise false.
      */
     boolean isMatch(int datasetId);
+
+    String getName();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
index 2b97fe7..5278208 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
@@ -26,4 +26,9 @@
     public boolean isMatch(int datasetId) {
         return MetadataIndexImmutableProperties.isMetadataDataset(datasetId);
     }
+
+    @Override
+    public String getName() {
+        return "metadata";
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
index 8ffaa15..49083c8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
@@ -24,4 +24,9 @@
     public boolean isMatch(int datasetId) {
         return false;
     }
+
+    @Override
+    public String getName() {
+        return "none";
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
index c4bb74c..af285be 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
@@ -20,22 +20,28 @@
 
 import java.net.InetSocketAddress;
 
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.hyracks.util.NetworkUtil;
 
 public class ReplicaIdentifier {
 
     private final int partition;
     private final String id;
+    private final IReplicationStrategy strategy;
     private volatile InetSocketAddress location;
+    private final String nodeId;
 
-    private ReplicaIdentifier(int partition, InetSocketAddress location) {
+    private ReplicaIdentifier(int partition, String nodeId, InetSocketAddress location, IReplicationStrategy strategy) {
         this.partition = partition;
         this.location = location;
-        id = partition + "@" + location.getHostString() + ":" + location.getPort();
+        this.nodeId = nodeId;
+        this.strategy = strategy;
+        id = partition + "@" + location.getHostString() + ":" + location.getPort() + ":" + strategy.getName();
     }
 
-    public static ReplicaIdentifier of(int partition, InetSocketAddress location) {
-        return new ReplicaIdentifier(partition, location);
+    public static ReplicaIdentifier of(int partition, String nodeId, InetSocketAddress location,
+            IReplicationStrategy strategy) {
+        return new ReplicaIdentifier(partition, nodeId, location, strategy);
     }
 
     public int getPartition() {
@@ -52,6 +58,14 @@
         return location;
     }
 
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public IReplicationStrategy getStrategy() {
+        return strategy;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
index dd953c4..9bf0337 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
@@ -101,7 +101,8 @@
 
     private void process(IReplicationJob job) {
         try {
-            if (skip(job)) {
+            DatasetResourceReference resourceRef = getJobDatasetResource(job);
+            if (skip(resourceRef)) {
                 return;
             }
             synchronized (transferLock) {
@@ -109,15 +110,17 @@
                     return;
                 }
                 final IndexSynchronizer synchronizer = new IndexSynchronizer(job, appCtx);
-                final int indexPartition = getJobPartition(job);
+                final int indexPartition = resourceRef.getPartitionNum();
                 for (ReplicationDestination dest : destinations) {
                     try {
                         Optional<IPartitionReplica> partitionReplica = dest.getPartitionReplica(indexPartition);
-                        if (!partitionReplica.isPresent()) {
+                        if (partitionReplica.isEmpty()) {
                             continue;
                         }
                         PartitionReplica replica = (PartitionReplica) partitionReplica.get();
-                        synchronizer.sync(replica);
+                        if (replica.getIdentifier().getStrategy().isMatch(resourceRef.getDatasetId())) {
+                            synchronizer.sync(replica);
+                        }
                     } catch (Exception e) {
                         handleFailure(dest, e);
                     }
@@ -129,23 +132,20 @@
         }
     }
 
-    private boolean skip(IReplicationJob job) {
+    private boolean skip(DatasetResourceReference indexFileRef) {
+        return indexFileRef == null || !replicationStrategy.isMatch(indexFileRef.getDatasetId());
+    }
+
+    private DatasetResourceReference getJobDatasetResource(IReplicationJob job) {
         try {
-            final String fileToReplicate = job.getAnyFile();
-            final Optional<DatasetResourceReference> indexFileRefOpt =
-                    resourceRepository.getLocalResourceReference(fileToReplicate);
-            if (!indexFileRefOpt.isPresent()) {
-                LOGGER.warn("skipping replication of {} due to missing dataset resource reference", fileToReplicate);
-                return true;
-            }
-            return !replicationStrategy.isMatch(indexFileRefOpt.get().getDatasetId());
+            return resourceRepository.getLocalResourceReference(job.getAnyFile()).orElse(null);
         } catch (HyracksDataException e) {
             throw new IllegalStateException("Couldn't find resource for " + job.getAnyFile(), e);
         }
     }
 
-    private int getJobPartition(IReplicationJob job) {
-        return ResourceReference.of(job.getAnyFile()).getPartitionNum();
+    private ResourceReference getJobPartition(IReplicationJob job) {
+        return ResourceReference.of(job.getAnyFile());
     }
 
     private void closeChannels() {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
index 6a23ae6..a805d82 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
@@ -128,6 +128,7 @@
     public void replicate(ILogRecord logRecord) throws InterruptedException {
         if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
             synchronized (destinations) {
+                //TODO filter destinations
                 ackTracker.track(logRecord, new HashSet<>(destinations.keySet()));
             }
         }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
new file mode 100644
index 0000000..e73bcb2
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ReplicaPromotionMessage implements ICcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+    private int partition;
+
+    public ReplicaPromotionMessage(String nodeId, int partition) {
+        this.nodeId = nodeId;
+        this.partition = partition;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        LOGGER.info("Partition {} master changed, new master {}", partition, nodeId);
+        IClusterStateManager clusterStateManager = appCtx.getClusterStateManager();
+        clusterStateManager.updateClusterPartition(partition, nodeId, true);
+        clusterStateManager.refreshState();
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 9099c88..2576c4f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.runtime.utils;
 
 import java.io.IOException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
 
 import org.apache.asterix.common.api.IConfigValidator;
@@ -101,6 +102,8 @@
     private final IConfigValidator configValidator;
     private final IAdapterFactoryService adapterFactoryService;
 
+    private final ReentrantReadWriteLock compilationLock = new ReentrantReadWriteLock(true);
+
     public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
             Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
             INcLifecycleCoordinator ftStrategy, IJobLifecycleListener activeLifeCycleListener,
@@ -314,4 +317,9 @@
     public IAdapterFactoryService getAdapterFactoryService() {
         return adapterFactoryService;
     }
+
+    @Override
+    public ReentrantReadWriteLock getCompilationLock() {
+        return compilationLock;
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 98a97b0..fc99792 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -24,12 +24,14 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
@@ -83,6 +85,8 @@
     @Override
     public void setCcAppCtx(ICcApplicationContext appCtx) {
         this.appCtx = appCtx;
+        //TODO we need to maintain the node2PartitionsMap in the same state before a failover and only change the
+        // a partition's current active node
         node2PartitionsMap = appCtx.getMetadataProperties().getNodePartitions();
         clusterPartitions = appCtx.getMetadataProperties().getClusterPartitions();
         currentMetadataNode = appCtx.getMetadataProperties().getMetadataNodeName();
@@ -139,20 +143,33 @@
     }
 
     @Override
-    public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters) {
+    public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters,
+            Set<Integer> nodePartitions) {
         if (active) {
             updateClusterCounters(nodeId, localCounters);
             participantNodes.add(nodeId);
         } else {
             participantNodes.remove(nodeId);
         }
-        ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
-        // if this isn't a storage node, it will not have cluster partitions
+        //        ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
         if (nodePartitions != null) {
-            for (ClusterPartition p : nodePartitions) {
-                updateClusterPartition(p.getPartitionId(), nodeId, active);
+            for (Integer partitionId : nodePartitions) {
+                updateClusterPartition(partitionId, nodeId, active);
+            }
+        } else {
+            List<ClusterPartition> nodeActivePartitions = clusterPartitions.values().stream()
+                    .filter(clusterPartition -> clusterPartition.getActiveNodeId().equals(nodeId))
+                    .collect(Collectors.toList());
+            for (ClusterPartition partition : nodeActivePartitions) {
+                updateClusterPartition(partition.getPartitionId(), nodeId, active);
             }
         }
+        // if this isn't a storage node, it will not have cluster partitions
+        //        if (nodePartitions != null) {
+        //            for (ClusterPartition p : nodePartitions) {
+        //                updateClusterPartition(p.getPartitionId(), nodeId, active);
+        //            }
+        //        }
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
index 0a0e026..c7be4ae 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
@@ -59,7 +59,7 @@
     @Override
     public FileReference getFileReference(IIOManager ioManager) throws HyracksDataException {
         if (cached == null) {
-            cached = new FileReference(ioManager.getIODevices().get(ioDevice), getPath());
+            cached = super.getFileReference(ioManager);
         }
         return cached;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index f54a62c..5166970 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -184,6 +184,9 @@
 
     public synchronized void failNode(String nodeId) throws HyracksException {
         NodeControllerState state = nodeRegistry.get(nodeId);
+        if (state == null) {
+            return;
+        }
         Set<JobId> affectedJobIds = state.getActiveJobIds();
         // Removes the node from node map.
         nodeRegistry.remove(nodeId);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12404
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I7997156936612bd068285710eacb5f6f8d0e0d3c
Gerrit-Change-Number: 12404
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Hubail <mh...@apache.org>
Gerrit-MessageType: newchange

Change in asterixdb[master]: PLEASE EDIT to provide a meaningful commit message!

Posted by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu>.
Anon. E. Moose #1000171 has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12404 )

Change subject: PLEASE EDIT to provide a meaningful commit message!
......................................................................


Patch Set 1: Contrib-2

Analytics Compatibility Compilation Failed
https://cbjenkins.page.link/c23YKBVJTLRMNNwE6 : UNSTABLE


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12404
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I7997156936612bd068285710eacb5f6f8d0e0d3c
Gerrit-Change-Number: 12404
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Hubail <mh...@apache.org>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-CC: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Tue, 20 Jul 2021 00:33:31 +0000
Gerrit-HasComments: No
Gerrit-Has-Labels: Yes
Gerrit-MessageType: comment

Change in asterixdb[master]: PLEASE EDIT to provide a meaningful commit message!

Posted by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu>.
From Jenkins <je...@fulliautomatix.ics.uci.edu>:

Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12404 )

Change subject: PLEASE EDIT to provide a meaningful commit message!
......................................................................


Patch Set 1: Integration-Tests-1

Integration Tests Failed

https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-integration-tests/12240/ : UNSTABLE


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12404
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I7997156936612bd068285710eacb5f6f8d0e0d3c
Gerrit-Change-Number: 12404
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Hubail <mh...@apache.org>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Tue, 20 Jul 2021 01:48:36 +0000
Gerrit-HasComments: No
Gerrit-Has-Labels: Yes
Gerrit-MessageType: comment