You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu> on 2021/07/26 00:43:15 UTC
Change in asterixdb[master]: WIP: Failover with -1
From Murtadha Hubail <mh...@apache.org>:
Murtadha Hubail has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12503 )
Change subject: WIP: Failover with -1
......................................................................
WIP: Failover with -1
Change-Id: I3b8de5d3354be36768e3c9da116c3c96a552a867
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllNonMetadataDatasetsReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
A asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
41 files changed, 421 insertions(+), 95 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/03/12503/1
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
index d7d5994..8fd16ff 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
@@ -135,6 +135,7 @@
final ObjectNode replicaJson = OBJECT_MAPPER.createObjectNode();
replicaJson.put("location", toHostPort(replica.getIdentifier().getLocation()));
replicaJson.put("status", replica.getStatus().toString());
+ replicaJson.put("nodeId", replica.getIdentifier().getNodeId());
replicasArray.add(replicaJson);
}
partitionJson.set("replicas", replicasArray);
@@ -167,11 +168,12 @@
final String partition = request.getParameter("partition");
final String host = request.getParameter("host");
final String port = request.getParameter("port");
+ final String nodeId = request.getParameter("nodeId");
if (partition == null || host == null || port == null) {
return null;
}
- final InetSocketAddress replicaAddress = new InetSocketAddress(host, Integer.valueOf(port));
- return ReplicaIdentifier.of(Integer.valueOf(partition), replicaAddress);
+ final InetSocketAddress replicaAddress = new InetSocketAddress(host, Integer.parseInt(port));
+ return ReplicaIdentifier.of(Integer.parseInt(partition), nodeId, replicaAddress);
}
private void processPromote(IServletRequest request, IServletResponse response) throws HyracksDataException {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 9aa433f..53d0973 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -38,7 +38,6 @@
import org.apache.asterix.common.api.IPropertiesFactory;
import org.apache.asterix.common.api.IReceptionist;
import org.apache.asterix.common.api.IReceptionistFactory;
-import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.ActiveProperties;
import org.apache.asterix.common.config.BuildProperties;
import org.apache.asterix.common.config.CompilerProperties;
@@ -225,9 +224,9 @@
new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
virtualBufferCache, indexCheckpointManagerProvider, ioManager.getIODevices().size());
final String nodeId = getServiceContext().getNodeId();
- final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
+ final String[] nodePartitions = nodeProperties.getActivePartitions();
final Set<Integer> nodePartitionsIds =
- Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet());
+ Arrays.stream(nodePartitions).map(Integer::parseInt).collect(Collectors.toSet());
replicaManager = new ReplicaManager(this, nodePartitionsIds);
isShuttingdown = false;
activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 0359cf1..a0adde1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -160,7 +160,7 @@
@Override
public void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException {
state = SystemState.RECOVERING;
- LOGGER.info("starting recovery ...");
+ LOGGER.info("starting recovery for partitions {}", partitions);
long readableSmallestLSN = logMgr.getReadableSmallestLSN();
Checkpoint checkpointObject = checkpointManager.getLatest();
@@ -362,19 +362,17 @@
index = (ILSMIndex) localResourceMetadata.createInstance(serviceCtx);
datasetLifecycleManager.register(localResource.getPath(), index);
datasetLifecycleManager.open(localResource.getPath());
- try {
- final DatasetResourceReference resourceReference =
- DatasetResourceReference.of(localResource);
- maxDiskLastLsn =
- indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
- } catch (HyracksDataException e) {
- datasetLifecycleManager.close(localResource.getPath());
- throw e;
- }
+ maxDiskLastLsn = getResourceLowWaterMark(localResource, datasetLifecycleManager,
+ indexCheckpointManagerProvider);
//#. set resourceId and maxDiskLastLSN to the map
resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
} else {
- maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
+ if (!resourceId2MaxLSNMap.containsKey(resourceId)) {
+ maxDiskLastLsn = getResourceLowWaterMark(localResource, datasetLifecycleManager,
+ indexCheckpointManagerProvider);
+ } else {
+ maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
+ }
}
// lsn @ maxDiskLastLsn is either a flush log or a master replica log
if (lsn >= maxDiskLastLsn) {
@@ -450,6 +448,19 @@
}
}
+ private long getResourceLowWaterMark(LocalResource localResource, IDatasetLifecycleManager datasetLifecycleManager,
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider) throws HyracksDataException {
+ long maxDiskLastLsn;
+ try {
+ final DatasetResourceReference resourceReference = DatasetResourceReference.of(localResource);
+ maxDiskLastLsn = indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
+ } catch (HyracksDataException e) {
+ datasetLifecycleManager.close(localResource.getPath());
+ throw e;
+ }
+ return maxDiskLastLsn;
+ }
+
private boolean needToFreeMemory() {
return Runtime.getRuntime().freeMemory() < cachedEntityCommitsPerJobSize;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index ad70cf4..546af88 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -30,11 +30,14 @@
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.storage.ReplicaIdentifier;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.asterix.replication.messaging.ReplicaPromotionMessage;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.config.IApplicationConfig;
@@ -68,17 +71,14 @@
@Override
public synchronized void addReplica(ReplicaIdentifier id) {
- final NodeControllerService controllerService =
- (NodeControllerService) appCtx.getServiceContext().getControllerService();
- final NodeStatus nodeStatus = controllerService.getNodeStatus();
- if (nodeStatus != NodeStatus.ACTIVE) {
- LOGGER.warn("Ignoring request to add replica. Node is not ACTIVE yet. Current status: {}", nodeStatus);
+ if (nodeNotReady("addReplica")) {
return;
}
- if (!partitions.contains(id.getPartition())) {
- throw new IllegalStateException(
- "This node is not the current master of partition(" + id.getPartition() + ")");
- }
+ //FIXIT return this condition when -1 is added to the set of active partitions
+ // if (!partitions.contains(id.getPartition())) {
+ // throw new IllegalStateException(
+ // "This node is not the current master of partition(" + id.getPartition() + ")");
+ // }
if (isSelf(id)) {
LOGGER.info("ignoring request to add replica to ourselves");
return;
@@ -89,8 +89,13 @@
@Override
public synchronized void removeReplica(ReplicaIdentifier id) {
+ if (nodeNotReady("removeReplica")) {
+ return;
+ }
+ //TODO return illegal state?
if (!replicas.containsKey(id)) {
- throw new IllegalStateException("replica with id(" + id + ") does not exist");
+ return;
+ // throw new IllegalStateException("replica with id(" + id + ") does not exist");
}
PartitionReplica replica = replicas.remove(id);
appCtx.getReplicationManager().unregister(replica);
@@ -110,6 +115,9 @@
@Override
public synchronized void promote(int partition) throws HyracksDataException {
+ if (nodeNotReady("promote")) {
+ return;
+ }
if (partitions.contains(partition)) {
return;
}
@@ -119,15 +127,30 @@
final IRecoveryManager recoveryManager = appCtx.getTransactionSubsystem().getRecoveryManager();
recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true);
partitions.add(partition);
+ // TODO should we just persist the takeover success on metakv to prevent and race between CC being active
+ // and the partition being active?
+ ReplicaPromotionMessage message =
+ new ReplicaPromotionMessage(appCtx.getTransactionSubsystem().getId(), partition);
+ INCMessageBroker messageBroker = (INCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ try {
+ messageBroker.sendMessageToPrimaryCC(message);
+ } catch (Exception e) {
+ //TODO halt
+ LOGGER.error("failed to notify CC of replica promotion");
+ }
}
@Override
public synchronized void release(int partition) throws HyracksDataException {
+ if (nodeNotReady("release")) {
+ throw new IllegalStateException("received release request while node is not ready");
+ }
if (!partitions.contains(partition)) {
return;
}
final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
- datasetLifecycleManager.flushDataset(appCtx.getReplicationManager().getReplicationStrategy());
+ IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
+ datasetLifecycleManager.flushDataset(replicationStrategy);
closePartitionResources(partition);
final List<IPartitionReplica> partitionReplicas = getReplicas(partition);
for (IPartitionReplica replica : partitionReplicas) {
@@ -141,14 +164,15 @@
return replicaSyncLock;
}
- private void closePartitionResources(int partition) throws HyracksDataException {
+ public void closePartitionResources(int partition) throws HyracksDataException {
final PersistentLocalResourceRepository resourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
final Map<Long, LocalResource> partitionResources = resourceRepository.getPartitionResources(partition);
final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
for (LocalResource resource : partitionResources.values()) {
- datasetLifecycleManager.close(resource.getPath());
+ datasetLifecycleManager.closeIfOpen(resource.getPath());
}
+ datasetLifecycleManager.closePartition(partition);
}
private boolean isSelf(ReplicaIdentifier id) {
@@ -157,6 +181,17 @@
int port = appConfig.getInt(NCConfig.Option.REPLICATION_LISTEN_PORT);
final InetSocketAddress replicaAddress = new InetSocketAddress(host, port);
- return id.equals(ReplicaIdentifier.of(id.getPartition(), replicaAddress));
+ return id.equals(ReplicaIdentifier.of(id.getPartition(), id.getNodeId(), replicaAddress));
+ }
+
+ private boolean nodeNotReady(String request) {
+ final NodeControllerService controllerService =
+ (NodeControllerService) appCtx.getServiceContext().getControllerService();
+ final NodeStatus nodeStatus = controllerService.getNodeStatus();
+ if (nodeStatus != NodeStatus.ACTIVE) {
+ LOGGER.warn("Ignoring request {}. Node is not ACTIVE yet. Current status: {}", request, nodeStatus);
+ return true;
+ }
+ return false;
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
index f58f871..f19d896 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
@@ -38,6 +38,7 @@
public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
try {
+ //TODO do we need to promote here?
appContext.getReplicaManager().promote(partitionId);
SystemState state = appContext.getTransactionSubsystem().getRecoveryManager().getSystemState();
appContext.initializeMetadata(state == SystemState.PERMANENT_DATA_LOSS, partitionId);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index c4e4f82..cacb15a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -26,14 +26,12 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
import org.apache.asterix.app.nc.task.CheckpointTask;
@@ -51,7 +49,6 @@
import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -102,7 +99,8 @@
@Override
public void notifyNodeFailure(String nodeId, InetSocketAddress replicaAddress) throws HyracksDataException {
pendingStartupCompletionNodes.remove(nodeId);
- clusterManager.updateNodeState(nodeId, false, null);
+ //TODO on node failure, we need to set all of it's active partitions to inactive
+ clusterManager.updateNodeState(nodeId, false, null, null);
if (nodeId.equals(metadataNodeId)) {
clusterManager.updateMetadataNode(metadataNodeId, false);
}
@@ -138,7 +136,8 @@
private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
final String nodeId = msg.getNodeId();
nodeSecretsMap.put(nodeId, msg.getSecrets());
- List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState());
+ List<INCLifecycleTask> tasks =
+ buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState(), msg.getActivePartitions());
RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
try {
messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
@@ -157,7 +156,7 @@
return;
}
if (msg.isSuccess()) {
- clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters());
+ clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters(), msg.getPartitions());
if (msg.getNodeId().equals(metadataNodeId)) {
clusterManager.updateMetadataNode(metadataNodeId, true);
}
@@ -167,7 +166,8 @@
}
}
- protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state) {
+ protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state,
+ Set<Integer> activePartitions) {
LOGGER.info("Building registration tasks for node {} with status {} and system state: {}", nodeId, nodeStatus,
state);
final boolean isMetadataNode = nodeId.equals(metadataNodeId);
@@ -175,7 +175,7 @@
case ACTIVE:
return buildActiveNCRegTasks(isMetadataNode);
case IDLE:
- return buildIdleNcRegTasks(nodeId, isMetadataNode, state);
+ return buildIdleNcRegTasks(nodeId, isMetadataNode, state, activePartitions);
default:
return new ArrayList<>();
}
@@ -210,15 +210,18 @@
}
}
- protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state) {
+ protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state,
+ Set<Integer> activePartitions) {
final List<INCLifecycleTask> tasks = new ArrayList<>();
tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING));
if (state == SystemState.CORRUPTED) {
- // need to perform local recovery for node partitions
- LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.stream(clusterManager.getNodePartitions(newNodeId))
- .map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
+ // need to perform local recovery for node active partitions
+ //TODO ensure recovery is done for all active partitions before the CheckpointTask since that
+ // might delete any old transaction log files
+ LocalRecoveryTask rt = new LocalRecoveryTask(activePartitions);
tasks.add(rt);
}
+
if (replicationEnabled) {
tasks.add(new StartReplicationServiceTask());
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
index 79fa7c8..c70b6bc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.app.replication.message;
+import java.util.Set;
+
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
import org.apache.asterix.common.replication.INCLifecycleMessage;
@@ -31,11 +33,14 @@
private final boolean success;
private Throwable exception;
private final NcLocalCounters localCounters;
+ private Set<Integer> partitions;
- public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters) {
+ public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters,
+ Set<Integer> partitions) {
this.nodeId = nodeId;
this.success = success;
this.localCounters = localCounters;
+ this.partitions = partitions;
}
@Override
@@ -63,6 +68,10 @@
return localCounters;
}
+ public Set<Integer> getPartitions() {
+ return partitions;
+ }
+
@Override
public MessageType getType() {
return MessageType.REGISTRATION_TASKS_RESULT;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
index c2cc63c..870413b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
@@ -20,6 +20,7 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
@@ -37,25 +38,27 @@
public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICcAddressedMessage {
private static final Logger LOGGER = LogManager.getLogger();
- private static final long serialVersionUID = 2L;
+ private static final long serialVersionUID = 3L;
protected final SystemState state;
protected final String nodeId;
protected final NodeStatus nodeStatus;
protected final Map<String, Object> secrets;
+ protected final Set<Integer> activePartitions;
public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state,
- Map<String, Object> secretsEphemeral) {
+ Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) {
this.state = state;
this.nodeId = nodeId;
this.nodeStatus = nodeStatus;
this.secrets = new HashMap<>(secretsEphemeral);
+ this.activePartitions = activePartitions;
}
public static void send(CcId ccId, NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState,
- Map<String, Object> secretsEphemeral) throws HyracksDataException {
+ Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) throws HyracksDataException {
try {
- RegistrationTasksRequestMessage msg =
- new RegistrationTasksRequestMessage(cs.getId(), nodeStatus, systemState, secretsEphemeral);
+ RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus,
+ systemState, secretsEphemeral, activePartitions);
((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(ccId, msg);
} catch (Exception e) {
LOGGER.log(Level.ERROR, "Unable to send RegistrationTasksRequestMessage to CC", e);
@@ -88,4 +91,8 @@
public Map<String, Object> getSecrets() {
return secrets;
}
+
+ public Set<Integer> getActivePartitions() {
+ return activePartitions;
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index 1227a6f..7848849 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -19,6 +19,7 @@
package org.apache.asterix.app.replication.message;
import java.util.List;
+import java.util.Set;
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.api.INcApplicationContext;
@@ -72,7 +73,9 @@
}
NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(),
(NodeControllerService) appCtx.getServiceContext().getControllerService()) : null;
- NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success, localCounter);
+ Set<Integer> partitions = appCtx.getReplicaManager().getPartitions();
+ NCLifecycleTaskReportMessage result =
+ new NCLifecycleTaskReportMessage(nodeId, success, localCounter, partitions);
result.setException(exception);
try {
broker.sendMessageToCC(getCcId(), result);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 3571d88..76eb184 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -42,6 +42,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.asterix.active.ActivityState;
@@ -272,6 +273,7 @@
protected final ICcApplicationContext appCtx;
protected final SessionOutput sessionOutput;
protected final SessionConfig sessionConfig;
+ protected final ReentrantReadWriteLock compilationLock;
protected Dataverse activeDataverse;
protected final List<FunctionDecl> declaredFunctions;
protected final ILangCompilationProvider compilationProvider;
@@ -290,6 +292,7 @@
this.appCtx = appCtx;
this.lockManager = appCtx.getMetadataLockManager();
this.lockUtil = appCtx.getMetadataLockUtil();
+ this.compilationLock = appCtx.getCompilationLock();
this.statements = statements;
this.sessionOutput = output;
this.sessionConfig = output.config();
@@ -3367,12 +3370,14 @@
final IMetadataLocker locker = new IMetadataLocker() {
@Override
public void lock() throws AlgebricksException {
+ compilationLock.readLock().lock();
lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
}
@Override
public void unlock() {
metadataProvider.getLocks().unlock();
+ compilationLock.readLock().unlock();
}
};
final IStatementCompiler compiler = () -> {
@@ -3962,6 +3967,7 @@
final IMetadataLocker locker = new IMetadataLocker() {
@Override
public void lock() {
+ compilationLock.readLock().lock();
}
@Override
@@ -3969,6 +3975,7 @@
metadataProvider.getLocks().unlock();
// release external datasets' locks acquired during compilation of the query
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
+ compilationLock.readLock().unlock();
}
};
final IStatementCompiler compiler = () -> {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index c148c92..1f55631 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -36,6 +36,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.asterix.algebra.base.ILangExtension;
import org.apache.asterix.api.http.server.BasicAuthServlet;
@@ -299,8 +300,10 @@
final Map httpSecrets =
apiServer != null ? Collections.singletonMap(SYS_AUTH_HEADER, apiServer.ctx().get(SYS_AUTH_HEADER))
: Collections.emptyMap();
+ Set<Integer> activePartitions = Arrays.stream(runtimeContext.getNodeProperties().getActivePartitions())
+ .map(Integer::parseInt).collect(Collectors.toSet());
RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
- currentStatus, systemState, httpSecrets);
+ currentStatus, systemState, httpSecrets, activePartitions);
}
@Override
@@ -340,7 +343,11 @@
@Override
public IFileDeviceResolver getFileDeviceResolver() {
return (relPath, devices) -> {
- int ioDeviceIndex = Math.abs(StoragePathUtil.getPartitionNumFromRelativePath(relPath) % devices.size());
+ int partition = StoragePathUtil.getPartitionNumFromRelativePath(relPath);
+ if (partition < 0) {
+ return devices.get(0);
+ }
+ int ioDeviceIndex = Math.abs(partition % devices.size());
return devices.get(ioDeviceIndex);
};
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index 9cc295e..370b330 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -200,7 +200,8 @@
private void notifyNodeStartupCompletion(CcApplicationContext applicationContext, String nodeId)
throws HyracksDataException {
- NCLifecycleTaskReportMessage msg = new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters());
+ NCLifecycleTaskReportMessage msg =
+ new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters(), Collections.emptySet());
applicationContext.getNcLifecycleCoordinator().process(msg);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index b03af55..48bada3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -169,4 +169,8 @@
* @return the current datasets io stats
*/
StorageIOStats getDatasetsIOStats();
+
+ void closeIfOpen(String resourcePath) throws HyracksDataException;
+
+ void closePartition(int partitionId);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
index cc99421..9c66acf 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
@@ -33,6 +33,15 @@
this.ioDeviceNum = ioDeviceNum;
}
+ public static ClusterPartition of(int partitionId, String nodeId, int ioDeviceNum, String activeNodeId,
+ boolean active, boolean pendingActivation) {
+ ClusterPartition partition = new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+ partition.activeNodeId = activeNodeId;
+ partition.active = active;
+ partition.pendingActivation = pendingActivation;
+ return partition;
+ }
+
public int getPartitionId() {
return partitionId;
}
@@ -71,7 +80,10 @@
@Override
public ClusterPartition clone() {
- return new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+ ClusterPartition clone = new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+ clone.setActiveNodeId(activeNodeId);
+ clone.setActive(active);
+ return clone;
}
@Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index a37e6e4..94f80cc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -62,9 +62,11 @@
* @param nodeId
* @param active
* @param ncLocalCounters
+ * @param partitions
* @throws HyracksDataException
*/
- void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters) throws HyracksDataException;
+ void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters, Set<Integer> partitions)
+ throws HyracksDataException;
/**
* Updates the active node and active state of the cluster partition with id {@code partitionNum}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
index aaf6316..efc9b6a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
@@ -46,7 +46,11 @@
STARTING_PARTITION_ID(
OptionTypes.INTEGER,
-1,
- "The first partition id to assign to iodevices on this node (-1 == auto-assign)");
+ "The first partition id to assign to iodevices on this node (-1 == auto-assign)"),
+ ACTIVE_PARTITIONS(
+ OptionTypes.STRING_ARRAY,
+ new String[] {},
+ "List of partitions this node is currently the master of");
private final IOptionType type;
private final Object defaultValue;
@@ -95,7 +99,7 @@
@Override
public boolean hidden() {
- return this == INITIAL_RUN || this == STARTING_PARTITION_ID;
+ return this == INITIAL_RUN || this == STARTING_PARTITION_ID || this == ACTIVE_PARTITIONS;
}
}
@@ -115,4 +119,8 @@
public String getTxnLogDir() {
return accessor.getString(Option.TXN_LOG_DIR);
}
+
+ public String[] getActivePartitions() {
+ return accessor.getStringArray(Option.ACTIVE_PARTITIONS);
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
index 3b6100c..5d1a8e4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
@@ -132,6 +132,7 @@
}
stores.put(ncId, nodeStores);
nodePartitionsMap.put(ncId, nodePartitions);
+ LOGGER.info("Node partitions {}", nodeCfg.getStringArray(NodeProperties.Option.ACTIVE_PARTITIONS));
}
private void loadAsterixBuildProperties() throws AsterixException {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index b2f4034..4b05629 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -113,6 +113,8 @@
@Override
public synchronized void register(String resourcePath, IIndex index) throws HyracksDataException {
+ LOGGER.info("registering {} with dataset life cycle manager. Thread: {}", index,
+ Thread.currentThread().getStackTrace());
validateDatasetLifecycleManagerState();
int did = getDIDfromResourcePath(resourcePath);
LocalResource resource = resourceRepository.get(resourcePath);
@@ -141,6 +143,8 @@
@Override
public synchronized void unregister(String resourcePath) throws HyracksDataException {
+ LOGGER.info("unregister {} with dataset life cycle manager. Thread: {}", resourcePath,
+ Thread.currentThread().getStackTrace());
validateDatasetLifecycleManagerState();
int did = getDIDfromResourcePath(resourcePath);
long resourceID = getResourceIDfromResourcePath(resourcePath);
@@ -559,6 +563,51 @@
return stats;
}
+ @Override
+ public synchronized void closeIfOpen(String resourcePath) throws HyracksDataException {
+ validateDatasetLifecycleManagerState();
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
+
+ DatasetResource dsr = datasets.get(did);
+ IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
+
+ if (dsr == null || iInfo == null) {
+ return;
+ }
+
+ PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
+ if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
+ if (LOGGER.isErrorEnabled()) {
+ final String logMsg = String.format(
+ "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
+ resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
+ LOGGER.error(logMsg);
+ }
+ throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
+ StoragePathUtil.getIndexNameFromPath(resourcePath));
+ }
+
+ // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
+ DatasetInfo dsInfo = dsr.getDatasetInfo();
+ dsInfo.waitForIO();
+ closeIndex(iInfo);
+ dsInfo.removeIndex(resourceID);
+ synchronized (dsInfo) {
+ if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
+ && !dsInfo.isExternal()) {
+ removeDatasetFromCache(dsInfo.getDatasetID());
+ }
+ }
+ }
+
+ @Override
+ public synchronized void closePartition(int partitionId) {
+ for (DatasetResource ds : datasets.values()) {
+ ds.removePartition(partitionId);
+ }
+ }
+
private void closeIndex(IndexInfo indexInfo) throws HyracksDataException {
if (indexInfo.isOpen()) {
ILSMOperationTracker opTracker = indexInfo.getIndex().getOperationTracker();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 54e1976..db9eabb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -182,4 +182,10 @@
public boolean isMetadataDataset() {
return MetadataIndexImmutableProperties.isMetadataDataset(getDatasetID());
}
+
+ public void removePartition(int partitionId) {
+ datasetPrimaryOpTrackers.remove(partitionId);
+ datasetComponentIdGenerators.remove(partitionId);
+ datasetRateLimiters.remove(partitionId);
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
index 7f448e3..249505b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
@@ -41,7 +41,7 @@
/**
* The resource partition
*/
- private final int partition;
+ private int partition;
private final IResource resource;
public DatasetLocalResource(int datasetId, int partition, IResource resource) {
@@ -68,6 +68,10 @@
resource.setPath(path);
}
+ public void setPartition(int partition) {
+ this.partition = partition;
+ }
+
@Override
public IIndex createInstance(INCServiceContext ncServiceCtx) throws HyracksDataException {
return resource.createInstance(ncServiceCtx);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index ad90814..72fb32c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.common.dataflow;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.ICoordinationService;
import org.apache.asterix.common.api.IMetadataLockManager;
@@ -149,4 +151,11 @@
* @return the adapter factory service
*/
IAdapterFactoryService getAdapterFactoryService();
+
+ /**
+ * Gets the compilation lock
+ *
+ * @return the compilation lock
+ */
+ ReentrantReadWriteLock getCompilationLock();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
index a8cd14f..46bca34 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
@@ -20,10 +20,12 @@
public class MetadataIndexImmutableProperties {
+ public static final int METADATA_PARTITION_ID = -1;
public static final int FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID = 52;
public static final int FIRST_AVAILABLE_USER_DATASET_ID = 100;
public static final int METADATA_DATASETS_PARTITIONS = 1;
public static final int METADATA_DATASETS_COUNT = 15;
+ // public static final Map<Integer, Integer>
private final String indexName;
private final int datasetId;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
index c969777..1dfc2e2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
@@ -24,4 +24,9 @@
public boolean isMatch(int datasetId) {
return true;
}
+
+ @Override
+ public String getName() {
+ return "all";
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllNonMetadataDatasetsReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllNonMetadataDatasetsReplicationStrategy.java
new file mode 100644
index 0000000..7d73e79
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllNonMetadataDatasetsReplicationStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+
+public class AllNonMetadataDatasetsReplicationStrategy implements IReplicationStrategy {
+
+ @Override
+ public boolean isMatch(int datasetId) {
+ return !MetadataIndexImmutableProperties.isMetadataDataset(datasetId);
+ }
+
+ @Override
+ public String getName() {
+ return "non-metadata";
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
index 0ad8be2..340bad4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
@@ -18,11 +18,15 @@
*/
package org.apache.asterix.common.replication;
-public interface IReplicationStrategy {
+import java.io.Serializable;
+
+public interface IReplicationStrategy extends Serializable {
/**
* @param datasetId
* @return True, if the dataset should be replicated. Otherwise false.
*/
boolean isMatch(int datasetId);
+
+ String getName();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
index 2b97fe7..5278208 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
@@ -26,4 +26,9 @@
public boolean isMatch(int datasetId) {
return MetadataIndexImmutableProperties.isMetadataDataset(datasetId);
}
+
+ @Override
+ public String getName() {
+ return "metadata";
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
index 8ffaa15..49083c8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
@@ -24,4 +24,9 @@
public boolean isMatch(int datasetId) {
return false;
}
+
+ @Override
+ public String getName() {
+ return "none";
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
index c4bb74c..95ea055 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
@@ -27,21 +27,29 @@
private final int partition;
private final String id;
private volatile InetSocketAddress location;
+ private final String nodeId;
+ private final int storagePartition;
- private ReplicaIdentifier(int partition, InetSocketAddress location) {
+ private ReplicaIdentifier(int partition, String nodeId, InetSocketAddress location) {
this.partition = partition;
this.location = location;
+ this.nodeId = nodeId;
+ storagePartition = partition;
id = partition + "@" + location.getHostString() + ":" + location.getPort();
}
- public static ReplicaIdentifier of(int partition, InetSocketAddress location) {
- return new ReplicaIdentifier(partition, location);
+ public static ReplicaIdentifier of(int partition, String nodeId, InetSocketAddress location) {
+ return new ReplicaIdentifier(partition, nodeId, location);
}
public int getPartition() {
return partition;
}
+ public int getStoragePartition() {
+ return storagePartition;
+ }
+
public InetSocketAddress getLocation() {
return location;
}
@@ -52,6 +60,10 @@
return location;
}
+ public String getNodeId() {
+ return nodeId;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
index d803756..06650b0 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
@@ -78,7 +78,7 @@
}
public synchronized Optional<IPartitionReplica> getPartitionReplica(int partition) {
- return replicas.stream().filter(replica -> replica.getIdentifier().getPartition() == partition
+ return replicas.stream().filter(replica -> replica.getIdentifier().getStoragePartition() == partition
&& replica.getStatus() == IPartitionReplica.PartitionReplicaStatus.IN_SYNC).findAny();
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
index dd953c4..aadea22 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
@@ -101,7 +101,8 @@
private void process(IReplicationJob job) {
try {
- if (skip(job)) {
+ DatasetResourceReference resourceRef = getJobDatasetResource(job);
+ if (skip(resourceRef)) {
return;
}
synchronized (transferLock) {
@@ -109,11 +110,11 @@
return;
}
final IndexSynchronizer synchronizer = new IndexSynchronizer(job, appCtx);
- final int indexPartition = getJobPartition(job);
+ final int indexPartition = resourceRef.getPartitionNum();
for (ReplicationDestination dest : destinations) {
try {
Optional<IPartitionReplica> partitionReplica = dest.getPartitionReplica(indexPartition);
- if (!partitionReplica.isPresent()) {
+ if (partitionReplica.isEmpty()) {
continue;
}
PartitionReplica replica = (PartitionReplica) partitionReplica.get();
@@ -129,23 +130,20 @@
}
}
- private boolean skip(IReplicationJob job) {
+ private boolean skip(DatasetResourceReference indexFileRef) {
+ return indexFileRef == null || !replicationStrategy.isMatch(indexFileRef.getDatasetId());
+ }
+
+ private DatasetResourceReference getJobDatasetResource(IReplicationJob job) {
try {
- final String fileToReplicate = job.getAnyFile();
- final Optional<DatasetResourceReference> indexFileRefOpt =
- resourceRepository.getLocalResourceReference(fileToReplicate);
- if (!indexFileRefOpt.isPresent()) {
- LOGGER.warn("skipping replication of {} due to missing dataset resource reference", fileToReplicate);
- return true;
- }
- return !replicationStrategy.isMatch(indexFileRefOpt.get().getDatasetId());
+ return resourceRepository.getLocalResourceReference(job.getAnyFile()).orElse(null);
} catch (HyracksDataException e) {
throw new IllegalStateException("Couldn't find resource for " + job.getAnyFile(), e);
}
}
- private int getJobPartition(IReplicationJob job) {
- return ResourceReference.of(job.getAnyFile()).getPartitionNum();
+ private ResourceReference getJobPartition(IReplicationJob job) {
+ return ResourceReference.of(job.getAnyFile());
}
private void closeChannels() {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
index 6a23ae6..a805d82 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
@@ -128,6 +128,7 @@
public void replicate(ILogRecord logRecord) throws InterruptedException {
if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
synchronized (destinations) {
+ //TODO filter destinations
ackTracker.track(logRecord, new HashSet<>(destinations.keySet()));
}
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index 767eb76..e0f9a8f 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -59,6 +59,8 @@
PersistentLocalResourceRepository resRepo =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
final IIOManager ioManager = appCtx.getIoManager();
+ //TODO this should consider replication strategy to avoid checkpointing metadata datasets
+ // when only other datasets should be checkpointed
final Collection<LocalResource> partitionResources = resRepo.getPartitionResources(partition).values();
final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
for (LocalResource ls : partitionResources) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
index cff12de..91bc35b 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
@@ -48,11 +48,12 @@
public void perform(INcApplicationContext appCtx, IReplicationWorker worker) throws HyracksDataException {
final PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
+ //TODO do we need to sync such tasks?
localResourceRepository.cleanup(partition);
- final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
- final List<String> partitionResources =
- localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
- .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
+ //TODO need to include checksum of each file too
+ IReplicationStrategy strategy = appCtx.getReplicationManager().getReplicationStrategy();
+ final List<String> partitionResources = localResourceRepository.getPartitionReplicatedFiles(partition, strategy)
+ .stream().map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
final PartitionResourcesListResponse response =
new PartitionResourcesListResponse(partition, partitionResources);
ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer());
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
new file mode 100644
index 0000000..e73bcb2
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ReplicaPromotionMessage implements ICcAddressedMessage {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final long serialVersionUID = 1L;
+ private final String nodeId;
+ private int partition;
+
+ public ReplicaPromotionMessage(String nodeId, int partition) {
+ this.nodeId = nodeId;
+ this.partition = partition;
+ }
+
+ @Override
+ public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ LOGGER.info("Partition {} master changed, new master {}", partition, nodeId);
+ IClusterStateManager clusterStateManager = appCtx.getClusterStateManager();
+ clusterStateManager.updateClusterPartition(partition, nodeId, true);
+ clusterStateManager.refreshState();
+ }
+}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index b47fd39..a01a2ba 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -26,7 +26,6 @@
import java.util.stream.Collectors;
import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.replication.api.PartitionReplica;
import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
@@ -49,14 +48,13 @@
}
public void sync() throws IOException {
- final int partition = replica.getIdentifier().getPartition();
+ final int partition = replica.getIdentifier().getStoragePartition();
final Set<String> replicaFiles = getReplicaFiles(partition);
final PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
- final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
- final Set<String> masterFiles =
- localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
- .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
+ final Set<String> masterFiles = localResourceRepository
+ .getPartitionReplicatedFiles(partition, appCtx.getReplicationManager().getReplicationStrategy())
+ .stream().map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
// find files on master and not on replica
final List<String> replicaMissingFiles =
masterFiles.stream().filter(file -> !replicaFiles.contains(file)).collect(Collectors.toList());
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 261236c..313263e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -60,15 +60,15 @@
private void syncFiles() throws IOException {
final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica);
- // flush replicated dataset to generate disk component for any remaining in-memory components
final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
+ // flush replicated dataset to generate disk component for any remaining in-memory components
appCtx.getDatasetLifecycleManager().flushDataset(replStrategy);
waitForReplicatedDatasetsIO();
fileSync.sync();
}
private void checkpointReplicaIndexes() throws IOException {
- final int partition = replica.getIdentifier().getPartition();
+ final int partition = replica.getIdentifier().getStoragePartition();
CheckpointPartitionIndexesTask task =
new CheckpointPartitionIndexesTask(partition, getPartitionMaxComponentId(partition));
ReplicationProtocol.sendTo(replica, task);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 9099c88..2576c4f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -19,6 +19,7 @@
package org.apache.asterix.runtime.utils;
import java.io.IOException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import org.apache.asterix.common.api.IConfigValidator;
@@ -101,6 +102,8 @@
private final IConfigValidator configValidator;
private final IAdapterFactoryService adapterFactoryService;
+ private final ReentrantReadWriteLock compilationLock = new ReentrantReadWriteLock(true);
+
public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
INcLifecycleCoordinator ftStrategy, IJobLifecycleListener activeLifeCycleListener,
@@ -314,4 +317,9 @@
public IAdapterFactoryService getAdapterFactoryService() {
return adapterFactoryService;
}
+
+ @Override
+ public ReentrantReadWriteLock getCompilationLock() {
+ return compilationLock;
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 98a97b0..fc99792 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -24,12 +24,14 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.cluster.ClusterPartition;
@@ -83,6 +85,8 @@
@Override
public void setCcAppCtx(ICcApplicationContext appCtx) {
this.appCtx = appCtx;
+ //TODO we need to maintain the node2PartitionsMap in the same state before a failover and only change the
+ // a partition's current active node
node2PartitionsMap = appCtx.getMetadataProperties().getNodePartitions();
clusterPartitions = appCtx.getMetadataProperties().getClusterPartitions();
currentMetadataNode = appCtx.getMetadataProperties().getMetadataNodeName();
@@ -139,20 +143,33 @@
}
@Override
- public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters) {
+ public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters,
+ Set<Integer> nodePartitions) {
if (active) {
updateClusterCounters(nodeId, localCounters);
participantNodes.add(nodeId);
} else {
participantNodes.remove(nodeId);
}
- ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
- // if this isn't a storage node, it will not have cluster partitions
+ // ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
if (nodePartitions != null) {
- for (ClusterPartition p : nodePartitions) {
- updateClusterPartition(p.getPartitionId(), nodeId, active);
+ for (Integer partitionId : nodePartitions) {
+ updateClusterPartition(partitionId, nodeId, active);
+ }
+ } else {
+ List<ClusterPartition> nodeActivePartitions = clusterPartitions.values().stream()
+ .filter(clusterPartition -> clusterPartition.getActiveNodeId().equals(nodeId))
+ .collect(Collectors.toList());
+ for (ClusterPartition partition : nodeActivePartitions) {
+ updateClusterPartition(partition.getPartitionId(), nodeId, active);
}
}
+ // if this isn't a storage node, it will not have cluster partitions
+ // if (nodePartitions != null) {
+ // for (ClusterPartition p : nodePartitions) {
+ // updateClusterPartition(p.getPartitionId(), nodeId, active);
+ // }
+ // }
}
@Override
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 29dedf7..556ab6a 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -248,7 +248,7 @@
}
}
- private static FileReference getLocalResourceFileByName(IIOManager ioManager, String resourcePath)
+ public static FileReference getLocalResourceFileByName(IIOManager ioManager, String resourcePath)
throws HyracksDataException {
String fileName = resourcePath + File.separator + StorageConstants.METADATA_FILE_NAME;
return ioManager.resolve(fileName);
@@ -603,4 +603,8 @@
private static boolean isComponentFile(File indexDir, String fileName) {
return COMPONENT_FILES_FILTER.accept(indexDir, fileName);
}
+
+ public Path[] getStorageRoots() {
+ return storageRoots;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
index 0a0e026..c7be4ae 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
@@ -59,7 +59,7 @@
@Override
public FileReference getFileReference(IIOManager ioManager) throws HyracksDataException {
if (cached == null) {
- cached = new FileReference(ioManager.getIODevices().get(ioDevice), getPath());
+ cached = super.getFileReference(ioManager);
}
return cached;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index f54a62c..5166970 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -184,6 +184,9 @@
public synchronized void failNode(String nodeId) throws HyracksException {
NodeControllerState state = nodeRegistry.get(nodeId);
+ if (state == null) {
+ return;
+ }
Set<JobId> affectedJobIds = state.getActiveJobIds();
// Removes the node from node map.
nodeRegistry.remove(nodeId);
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12503
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I3b8de5d3354be36768e3c9da116c3c96a552a867
Gerrit-Change-Number: 12503
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Hubail <mh...@apache.org>
Gerrit-MessageType: newchange
Change in asterixdb[master]: WIP: Failover with -1
Posted by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu>.
From Murtadha Hubail <mh...@apache.org>:
Murtadha Hubail has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12503 )
Change subject: WIP: Failover with -1
......................................................................
WIP: Failover with -1
Change-Id: I3b8de5d3354be36768e3c9da116c3c96a552a867
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllNonMetadataDatasetsReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
A asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
41 files changed, 421 insertions(+), 95 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/03/12503/1
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
index d7d5994..8fd16ff 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
@@ -135,6 +135,7 @@
final ObjectNode replicaJson = OBJECT_MAPPER.createObjectNode();
replicaJson.put("location", toHostPort(replica.getIdentifier().getLocation()));
replicaJson.put("status", replica.getStatus().toString());
+ replicaJson.put("nodeId", replica.getIdentifier().getNodeId());
replicasArray.add(replicaJson);
}
partitionJson.set("replicas", replicasArray);
@@ -167,11 +168,12 @@
final String partition = request.getParameter("partition");
final String host = request.getParameter("host");
final String port = request.getParameter("port");
+ final String nodeId = request.getParameter("nodeId");
if (partition == null || host == null || port == null) {
return null;
}
- final InetSocketAddress replicaAddress = new InetSocketAddress(host, Integer.valueOf(port));
- return ReplicaIdentifier.of(Integer.valueOf(partition), replicaAddress);
+ final InetSocketAddress replicaAddress = new InetSocketAddress(host, Integer.parseInt(port));
+ return ReplicaIdentifier.of(Integer.parseInt(partition), nodeId, replicaAddress);
}
private void processPromote(IServletRequest request, IServletResponse response) throws HyracksDataException {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 9aa433f..53d0973 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -38,7 +38,6 @@
import org.apache.asterix.common.api.IPropertiesFactory;
import org.apache.asterix.common.api.IReceptionist;
import org.apache.asterix.common.api.IReceptionistFactory;
-import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.ActiveProperties;
import org.apache.asterix.common.config.BuildProperties;
import org.apache.asterix.common.config.CompilerProperties;
@@ -225,9 +224,9 @@
new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
virtualBufferCache, indexCheckpointManagerProvider, ioManager.getIODevices().size());
final String nodeId = getServiceContext().getNodeId();
- final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
+ final String[] nodePartitions = nodeProperties.getActivePartitions();
final Set<Integer> nodePartitionsIds =
- Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet());
+ Arrays.stream(nodePartitions).map(Integer::parseInt).collect(Collectors.toSet());
replicaManager = new ReplicaManager(this, nodePartitionsIds);
isShuttingdown = false;
activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 0359cf1..a0adde1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -160,7 +160,7 @@
@Override
public void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException {
state = SystemState.RECOVERING;
- LOGGER.info("starting recovery ...");
+ LOGGER.info("starting recovery for partitions {}", partitions);
long readableSmallestLSN = logMgr.getReadableSmallestLSN();
Checkpoint checkpointObject = checkpointManager.getLatest();
@@ -362,19 +362,17 @@
index = (ILSMIndex) localResourceMetadata.createInstance(serviceCtx);
datasetLifecycleManager.register(localResource.getPath(), index);
datasetLifecycleManager.open(localResource.getPath());
- try {
- final DatasetResourceReference resourceReference =
- DatasetResourceReference.of(localResource);
- maxDiskLastLsn =
- indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
- } catch (HyracksDataException e) {
- datasetLifecycleManager.close(localResource.getPath());
- throw e;
- }
+ maxDiskLastLsn = getResourceLowWaterMark(localResource, datasetLifecycleManager,
+ indexCheckpointManagerProvider);
//#. set resourceId and maxDiskLastLSN to the map
resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
} else {
- maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
+ if (!resourceId2MaxLSNMap.containsKey(resourceId)) {
+ maxDiskLastLsn = getResourceLowWaterMark(localResource, datasetLifecycleManager,
+ indexCheckpointManagerProvider);
+ } else {
+ maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
+ }
}
// lsn @ maxDiskLastLsn is either a flush log or a master replica log
if (lsn >= maxDiskLastLsn) {
@@ -450,6 +448,19 @@
}
}
+ private long getResourceLowWaterMark(LocalResource localResource, IDatasetLifecycleManager datasetLifecycleManager,
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider) throws HyracksDataException {
+ long maxDiskLastLsn;
+ try {
+ final DatasetResourceReference resourceReference = DatasetResourceReference.of(localResource);
+ maxDiskLastLsn = indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
+ } catch (HyracksDataException e) {
+ datasetLifecycleManager.close(localResource.getPath());
+ throw e;
+ }
+ return maxDiskLastLsn;
+ }
+
private boolean needToFreeMemory() {
return Runtime.getRuntime().freeMemory() < cachedEntityCommitsPerJobSize;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index ad70cf4..546af88 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -30,11 +30,14 @@
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.storage.ReplicaIdentifier;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.asterix.replication.messaging.ReplicaPromotionMessage;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.config.IApplicationConfig;
@@ -68,17 +71,14 @@
@Override
public synchronized void addReplica(ReplicaIdentifier id) {
- final NodeControllerService controllerService =
- (NodeControllerService) appCtx.getServiceContext().getControllerService();
- final NodeStatus nodeStatus = controllerService.getNodeStatus();
- if (nodeStatus != NodeStatus.ACTIVE) {
- LOGGER.warn("Ignoring request to add replica. Node is not ACTIVE yet. Current status: {}", nodeStatus);
+ if (nodeNotReady("addReplica")) {
return;
}
- if (!partitions.contains(id.getPartition())) {
- throw new IllegalStateException(
- "This node is not the current master of partition(" + id.getPartition() + ")");
- }
+ //FIXIT return this condition when -1 is added to the set of active partitions
+ // if (!partitions.contains(id.getPartition())) {
+ // throw new IllegalStateException(
+ // "This node is not the current master of partition(" + id.getPartition() + ")");
+ // }
if (isSelf(id)) {
LOGGER.info("ignoring request to add replica to ourselves");
return;
@@ -89,8 +89,13 @@
@Override
public synchronized void removeReplica(ReplicaIdentifier id) {
+ if (nodeNotReady("removeReplica")) {
+ return;
+ }
+ //TODO return illegal state?
if (!replicas.containsKey(id)) {
- throw new IllegalStateException("replica with id(" + id + ") does not exist");
+ return;
+ // throw new IllegalStateException("replica with id(" + id + ") does not exist");
}
PartitionReplica replica = replicas.remove(id);
appCtx.getReplicationManager().unregister(replica);
@@ -110,6 +115,9 @@
@Override
public synchronized void promote(int partition) throws HyracksDataException {
+ if (nodeNotReady("promote")) {
+ return;
+ }
if (partitions.contains(partition)) {
return;
}
@@ -119,15 +127,30 @@
final IRecoveryManager recoveryManager = appCtx.getTransactionSubsystem().getRecoveryManager();
recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true);
partitions.add(partition);
+ // TODO should we just persist the takeover success on metakv to prevent and race between CC being active
+ // and the partition being active?
+ ReplicaPromotionMessage message =
+ new ReplicaPromotionMessage(appCtx.getTransactionSubsystem().getId(), partition);
+ INCMessageBroker messageBroker = (INCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ try {
+ messageBroker.sendMessageToPrimaryCC(message);
+ } catch (Exception e) {
+ //TODO halt
+ LOGGER.error("failed to notify CC of replica promotion");
+ }
}
@Override
public synchronized void release(int partition) throws HyracksDataException {
+ if (nodeNotReady("release")) {
+ throw new IllegalStateException("received release request while node is not ready");
+ }
if (!partitions.contains(partition)) {
return;
}
final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
- datasetLifecycleManager.flushDataset(appCtx.getReplicationManager().getReplicationStrategy());
+ IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
+ datasetLifecycleManager.flushDataset(replicationStrategy);
closePartitionResources(partition);
final List<IPartitionReplica> partitionReplicas = getReplicas(partition);
for (IPartitionReplica replica : partitionReplicas) {
@@ -141,14 +164,15 @@
return replicaSyncLock;
}
- private void closePartitionResources(int partition) throws HyracksDataException {
+ public void closePartitionResources(int partition) throws HyracksDataException {
final PersistentLocalResourceRepository resourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
final Map<Long, LocalResource> partitionResources = resourceRepository.getPartitionResources(partition);
final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
for (LocalResource resource : partitionResources.values()) {
- datasetLifecycleManager.close(resource.getPath());
+ datasetLifecycleManager.closeIfOpen(resource.getPath());
}
+ datasetLifecycleManager.closePartition(partition);
}
private boolean isSelf(ReplicaIdentifier id) {
@@ -157,6 +181,17 @@
int port = appConfig.getInt(NCConfig.Option.REPLICATION_LISTEN_PORT);
final InetSocketAddress replicaAddress = new InetSocketAddress(host, port);
- return id.equals(ReplicaIdentifier.of(id.getPartition(), replicaAddress));
+ return id.equals(ReplicaIdentifier.of(id.getPartition(), id.getNodeId(), replicaAddress));
+ }
+
+ private boolean nodeNotReady(String request) {
+ final NodeControllerService controllerService =
+ (NodeControllerService) appCtx.getServiceContext().getControllerService();
+ final NodeStatus nodeStatus = controllerService.getNodeStatus();
+ if (nodeStatus != NodeStatus.ACTIVE) {
+ LOGGER.warn("Ignoring request {}. Node is not ACTIVE yet. Current status: {}", request, nodeStatus);
+ return true;
+ }
+ return false;
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
index f58f871..f19d896 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
@@ -38,6 +38,7 @@
public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
try {
+ //TODO do we need to promote here?
appContext.getReplicaManager().promote(partitionId);
SystemState state = appContext.getTransactionSubsystem().getRecoveryManager().getSystemState();
appContext.initializeMetadata(state == SystemState.PERMANENT_DATA_LOSS, partitionId);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index c4e4f82..cacb15a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -26,14 +26,12 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
import org.apache.asterix.app.nc.task.CheckpointTask;
@@ -51,7 +49,6 @@
import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -102,7 +99,8 @@
@Override
public void notifyNodeFailure(String nodeId, InetSocketAddress replicaAddress) throws HyracksDataException {
pendingStartupCompletionNodes.remove(nodeId);
- clusterManager.updateNodeState(nodeId, false, null);
+ //TODO on node failure, we need to set all of it's active partitions to inactive
+ clusterManager.updateNodeState(nodeId, false, null, null);
if (nodeId.equals(metadataNodeId)) {
clusterManager.updateMetadataNode(metadataNodeId, false);
}
@@ -138,7 +136,8 @@
private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
final String nodeId = msg.getNodeId();
nodeSecretsMap.put(nodeId, msg.getSecrets());
- List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState());
+ List<INCLifecycleTask> tasks =
+ buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState(), msg.getActivePartitions());
RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
try {
messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
@@ -157,7 +156,7 @@
return;
}
if (msg.isSuccess()) {
- clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters());
+ clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters(), msg.getPartitions());
if (msg.getNodeId().equals(metadataNodeId)) {
clusterManager.updateMetadataNode(metadataNodeId, true);
}
@@ -167,7 +166,8 @@
}
}
- protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state) {
+ protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state,
+ Set<Integer> activePartitions) {
LOGGER.info("Building registration tasks for node {} with status {} and system state: {}", nodeId, nodeStatus,
state);
final boolean isMetadataNode = nodeId.equals(metadataNodeId);
@@ -175,7 +175,7 @@
case ACTIVE:
return buildActiveNCRegTasks(isMetadataNode);
case IDLE:
- return buildIdleNcRegTasks(nodeId, isMetadataNode, state);
+ return buildIdleNcRegTasks(nodeId, isMetadataNode, state, activePartitions);
default:
return new ArrayList<>();
}
@@ -210,15 +210,18 @@
}
}
- protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state) {
+ protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state,
+ Set<Integer> activePartitions) {
final List<INCLifecycleTask> tasks = new ArrayList<>();
tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING));
if (state == SystemState.CORRUPTED) {
- // need to perform local recovery for node partitions
- LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.stream(clusterManager.getNodePartitions(newNodeId))
- .map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
+ // need to perform local recovery for node active partitions
+ //TODO ensure recovery is done for all active partitions before the CheckpointTask since that
+ // might delete any old transaction log files
+ LocalRecoveryTask rt = new LocalRecoveryTask(activePartitions);
tasks.add(rt);
}
+
if (replicationEnabled) {
tasks.add(new StartReplicationServiceTask());
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
index 79fa7c8..c70b6bc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.app.replication.message;
+import java.util.Set;
+
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
import org.apache.asterix.common.replication.INCLifecycleMessage;
@@ -31,11 +33,14 @@
private final boolean success;
private Throwable exception;
private final NcLocalCounters localCounters;
+ private Set<Integer> partitions;
- public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters) {
+ public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters,
+ Set<Integer> partitions) {
this.nodeId = nodeId;
this.success = success;
this.localCounters = localCounters;
+ this.partitions = partitions;
}
@Override
@@ -63,6 +68,10 @@
return localCounters;
}
+ public Set<Integer> getPartitions() {
+ return partitions;
+ }
+
@Override
public MessageType getType() {
return MessageType.REGISTRATION_TASKS_RESULT;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
index c2cc63c..870413b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
@@ -20,6 +20,7 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
@@ -37,25 +38,27 @@
public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICcAddressedMessage {
private static final Logger LOGGER = LogManager.getLogger();
- private static final long serialVersionUID = 2L;
+ private static final long serialVersionUID = 3L;
protected final SystemState state;
protected final String nodeId;
protected final NodeStatus nodeStatus;
protected final Map<String, Object> secrets;
+ protected final Set<Integer> activePartitions;
public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state,
- Map<String, Object> secretsEphemeral) {
+ Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) {
this.state = state;
this.nodeId = nodeId;
this.nodeStatus = nodeStatus;
this.secrets = new HashMap<>(secretsEphemeral);
+ this.activePartitions = activePartitions;
}
public static void send(CcId ccId, NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState,
- Map<String, Object> secretsEphemeral) throws HyracksDataException {
+ Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) throws HyracksDataException {
try {
- RegistrationTasksRequestMessage msg =
- new RegistrationTasksRequestMessage(cs.getId(), nodeStatus, systemState, secretsEphemeral);
+ RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus,
+ systemState, secretsEphemeral, activePartitions);
((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(ccId, msg);
} catch (Exception e) {
LOGGER.log(Level.ERROR, "Unable to send RegistrationTasksRequestMessage to CC", e);
@@ -88,4 +91,8 @@
public Map<String, Object> getSecrets() {
return secrets;
}
+
+ public Set<Integer> getActivePartitions() {
+ return activePartitions;
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index 1227a6f..7848849 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -19,6 +19,7 @@
package org.apache.asterix.app.replication.message;
import java.util.List;
+import java.util.Set;
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.api.INcApplicationContext;
@@ -72,7 +73,9 @@
}
NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(),
(NodeControllerService) appCtx.getServiceContext().getControllerService()) : null;
- NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success, localCounter);
+ Set<Integer> partitions = appCtx.getReplicaManager().getPartitions();
+ NCLifecycleTaskReportMessage result =
+ new NCLifecycleTaskReportMessage(nodeId, success, localCounter, partitions);
result.setException(exception);
try {
broker.sendMessageToCC(getCcId(), result);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 3571d88..76eb184 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -42,6 +42,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.asterix.active.ActivityState;
@@ -272,6 +273,7 @@
protected final ICcApplicationContext appCtx;
protected final SessionOutput sessionOutput;
protected final SessionConfig sessionConfig;
+ protected final ReentrantReadWriteLock compilationLock;
protected Dataverse activeDataverse;
protected final List<FunctionDecl> declaredFunctions;
protected final ILangCompilationProvider compilationProvider;
@@ -290,6 +292,7 @@
this.appCtx = appCtx;
this.lockManager = appCtx.getMetadataLockManager();
this.lockUtil = appCtx.getMetadataLockUtil();
+ this.compilationLock = appCtx.getCompilationLock();
this.statements = statements;
this.sessionOutput = output;
this.sessionConfig = output.config();
@@ -3367,12 +3370,14 @@
final IMetadataLocker locker = new IMetadataLocker() {
@Override
public void lock() throws AlgebricksException {
+ compilationLock.readLock().lock();
lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
}
@Override
public void unlock() {
metadataProvider.getLocks().unlock();
+ compilationLock.readLock().unlock();
}
};
final IStatementCompiler compiler = () -> {
@@ -3962,6 +3967,7 @@
final IMetadataLocker locker = new IMetadataLocker() {
@Override
public void lock() {
+ compilationLock.readLock().lock();
}
@Override
@@ -3969,6 +3975,7 @@
metadataProvider.getLocks().unlock();
// release external datasets' locks acquired during compilation of the query
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
+ compilationLock.readLock().unlock();
}
};
final IStatementCompiler compiler = () -> {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index c148c92..1f55631 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -36,6 +36,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.asterix.algebra.base.ILangExtension;
import org.apache.asterix.api.http.server.BasicAuthServlet;
@@ -299,8 +300,10 @@
final Map httpSecrets =
apiServer != null ? Collections.singletonMap(SYS_AUTH_HEADER, apiServer.ctx().get(SYS_AUTH_HEADER))
: Collections.emptyMap();
+ Set<Integer> activePartitions = Arrays.stream(runtimeContext.getNodeProperties().getActivePartitions())
+ .map(Integer::parseInt).collect(Collectors.toSet());
RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
- currentStatus, systemState, httpSecrets);
+ currentStatus, systemState, httpSecrets, activePartitions);
}
@Override
@@ -340,7 +343,11 @@
@Override
public IFileDeviceResolver getFileDeviceResolver() {
return (relPath, devices) -> {
- int ioDeviceIndex = Math.abs(StoragePathUtil.getPartitionNumFromRelativePath(relPath) % devices.size());
+ int partition = StoragePathUtil.getPartitionNumFromRelativePath(relPath);
+ if (partition < 0) {
+ return devices.get(0);
+ }
+ int ioDeviceIndex = Math.abs(partition % devices.size());
return devices.get(ioDeviceIndex);
};
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index 9cc295e..370b330 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -200,7 +200,8 @@
private void notifyNodeStartupCompletion(CcApplicationContext applicationContext, String nodeId)
throws HyracksDataException {
- NCLifecycleTaskReportMessage msg = new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters());
+ NCLifecycleTaskReportMessage msg =
+ new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters(), Collections.emptySet());
applicationContext.getNcLifecycleCoordinator().process(msg);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index b03af55..48bada3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -169,4 +169,8 @@
* @return the current datasets io stats
*/
StorageIOStats getDatasetsIOStats();
+
+ void closeIfOpen(String resourcePath) throws HyracksDataException;
+
+ void closePartition(int partitionId);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
index cc99421..9c66acf 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
@@ -33,6 +33,15 @@
this.ioDeviceNum = ioDeviceNum;
}
+ public static ClusterPartition of(int partitionId, String nodeId, int ioDeviceNum, String activeNodeId,
+ boolean active, boolean pendingActivation) {
+ ClusterPartition partition = new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+ partition.activeNodeId = activeNodeId;
+ partition.active = active;
+ partition.pendingActivation = pendingActivation;
+ return partition;
+ }
+
public int getPartitionId() {
return partitionId;
}
@@ -71,7 +80,10 @@
@Override
public ClusterPartition clone() {
- return new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+ ClusterPartition clone = new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+ clone.setActiveNodeId(activeNodeId);
+ clone.setActive(active);
+ return clone;
}
@Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index a37e6e4..94f80cc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -62,9 +62,11 @@
* @param nodeId
* @param active
* @param ncLocalCounters
+ * @param partitions
* @throws HyracksDataException
*/
- void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters) throws HyracksDataException;
+ void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters, Set<Integer> partitions)
+ throws HyracksDataException;
/**
* Updates the active node and active state of the cluster partition with id {@code partitionNum}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
index aaf6316..efc9b6a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
@@ -46,7 +46,11 @@
STARTING_PARTITION_ID(
OptionTypes.INTEGER,
-1,
- "The first partition id to assign to iodevices on this node (-1 == auto-assign)");
+ "The first partition id to assign to iodevices on this node (-1 == auto-assign)"),
+ ACTIVE_PARTITIONS(
+ OptionTypes.STRING_ARRAY,
+ new String[] {},
+ "List of partitions this node is currently the master of");
private final IOptionType type;
private final Object defaultValue;
@@ -95,7 +99,7 @@
@Override
public boolean hidden() {
- return this == INITIAL_RUN || this == STARTING_PARTITION_ID;
+ return this == INITIAL_RUN || this == STARTING_PARTITION_ID || this == ACTIVE_PARTITIONS;
}
}
@@ -115,4 +119,8 @@
public String getTxnLogDir() {
return accessor.getString(Option.TXN_LOG_DIR);
}
+
+ public String[] getActivePartitions() {
+ return accessor.getStringArray(Option.ACTIVE_PARTITIONS);
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
index 3b6100c..5d1a8e4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
@@ -132,6 +132,7 @@
}
stores.put(ncId, nodeStores);
nodePartitionsMap.put(ncId, nodePartitions);
+ LOGGER.info("Node partitions {}", nodeCfg.getStringArray(NodeProperties.Option.ACTIVE_PARTITIONS));
}
private void loadAsterixBuildProperties() throws AsterixException {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index b2f4034..4b05629 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -113,6 +113,8 @@
@Override
public synchronized void register(String resourcePath, IIndex index) throws HyracksDataException {
+ LOGGER.info("registering {} with dataset life cycle manager. Thread: {}", index,
+ Thread.currentThread().getStackTrace());
validateDatasetLifecycleManagerState();
int did = getDIDfromResourcePath(resourcePath);
LocalResource resource = resourceRepository.get(resourcePath);
@@ -141,6 +143,8 @@
@Override
public synchronized void unregister(String resourcePath) throws HyracksDataException {
+ LOGGER.info("unregister {} with dataset life cycle manager. Thread: {}", resourcePath,
+ Thread.currentThread().getStackTrace());
validateDatasetLifecycleManagerState();
int did = getDIDfromResourcePath(resourcePath);
long resourceID = getResourceIDfromResourcePath(resourcePath);
@@ -559,6 +563,51 @@
return stats;
}
+ @Override
+ public synchronized void closeIfOpen(String resourcePath) throws HyracksDataException {
+ validateDatasetLifecycleManagerState();
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
+
+ DatasetResource dsr = datasets.get(did);
+ IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
+
+ if (dsr == null || iInfo == null) {
+ return;
+ }
+
+ PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
+ if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
+ if (LOGGER.isErrorEnabled()) {
+ final String logMsg = String.format(
+ "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
+ resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
+ LOGGER.error(logMsg);
+ }
+ throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
+ StoragePathUtil.getIndexNameFromPath(resourcePath));
+ }
+
+ // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
+ DatasetInfo dsInfo = dsr.getDatasetInfo();
+ dsInfo.waitForIO();
+ closeIndex(iInfo);
+ dsInfo.removeIndex(resourceID);
+ synchronized (dsInfo) {
+ if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
+ && !dsInfo.isExternal()) {
+ removeDatasetFromCache(dsInfo.getDatasetID());
+ }
+ }
+ }
+
+ @Override
+ public synchronized void closePartition(int partitionId) {
+ for (DatasetResource ds : datasets.values()) {
+ ds.removePartition(partitionId);
+ }
+ }
+
private void closeIndex(IndexInfo indexInfo) throws HyracksDataException {
if (indexInfo.isOpen()) {
ILSMOperationTracker opTracker = indexInfo.getIndex().getOperationTracker();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 54e1976..db9eabb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -182,4 +182,10 @@
public boolean isMetadataDataset() {
return MetadataIndexImmutableProperties.isMetadataDataset(getDatasetID());
}
+
+ public void removePartition(int partitionId) {
+ datasetPrimaryOpTrackers.remove(partitionId);
+ datasetComponentIdGenerators.remove(partitionId);
+ datasetRateLimiters.remove(partitionId);
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
index 7f448e3..249505b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
@@ -41,7 +41,7 @@
/**
* The resource partition
*/
- private final int partition;
+ private int partition;
private final IResource resource;
public DatasetLocalResource(int datasetId, int partition, IResource resource) {
@@ -68,6 +68,10 @@
resource.setPath(path);
}
+ public void setPartition(int partition) {
+ this.partition = partition;
+ }
+
@Override
public IIndex createInstance(INCServiceContext ncServiceCtx) throws HyracksDataException {
return resource.createInstance(ncServiceCtx);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index ad90814..72fb32c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.common.dataflow;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.ICoordinationService;
import org.apache.asterix.common.api.IMetadataLockManager;
@@ -149,4 +151,11 @@
* @return the adapter factory service
*/
IAdapterFactoryService getAdapterFactoryService();
+
+ /**
+ * Gets the compilation lock
+ *
+ * @return the compilation lock
+ */
+ ReentrantReadWriteLock getCompilationLock();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
index a8cd14f..46bca34 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
@@ -20,10 +20,12 @@
public class MetadataIndexImmutableProperties {
+ public static final int METADATA_PARTITION_ID = -1;
public static final int FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID = 52;
public static final int FIRST_AVAILABLE_USER_DATASET_ID = 100;
public static final int METADATA_DATASETS_PARTITIONS = 1;
public static final int METADATA_DATASETS_COUNT = 15;
+ // public static final Map<Integer, Integer>
private final String indexName;
private final int datasetId;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
index c969777..1dfc2e2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
@@ -24,4 +24,9 @@
public boolean isMatch(int datasetId) {
return true;
}
+
+ @Override
+ public String getName() {
+ return "all";
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllNonMetadataDatasetsReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllNonMetadataDatasetsReplicationStrategy.java
new file mode 100644
index 0000000..7d73e79
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllNonMetadataDatasetsReplicationStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+
+public class AllNonMetadataDatasetsReplicationStrategy implements IReplicationStrategy {
+
+ @Override
+ public boolean isMatch(int datasetId) {
+ return !MetadataIndexImmutableProperties.isMetadataDataset(datasetId);
+ }
+
+ @Override
+ public String getName() {
+ return "non-metadata";
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
index 0ad8be2..340bad4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
@@ -18,11 +18,15 @@
*/
package org.apache.asterix.common.replication;
-public interface IReplicationStrategy {
+import java.io.Serializable;
+
+public interface IReplicationStrategy extends Serializable {
/**
* @param datasetId
* @return True, if the dataset should be replicated. Otherwise false.
*/
boolean isMatch(int datasetId);
+
+ String getName();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
index 2b97fe7..5278208 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
@@ -26,4 +26,9 @@
public boolean isMatch(int datasetId) {
return MetadataIndexImmutableProperties.isMetadataDataset(datasetId);
}
+
+ @Override
+ public String getName() {
+ return "metadata";
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
index 8ffaa15..49083c8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
@@ -24,4 +24,9 @@
public boolean isMatch(int datasetId) {
return false;
}
+
+ @Override
+ public String getName() {
+ return "none";
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
index c4bb74c..95ea055 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
@@ -27,21 +27,29 @@
private final int partition;
private final String id;
private volatile InetSocketAddress location;
+ private final String nodeId;
+ private final int storagePartition;
- private ReplicaIdentifier(int partition, InetSocketAddress location) {
+ private ReplicaIdentifier(int partition, String nodeId, InetSocketAddress location) {
this.partition = partition;
this.location = location;
+ this.nodeId = nodeId;
+ storagePartition = partition;
id = partition + "@" + location.getHostString() + ":" + location.getPort();
}
- public static ReplicaIdentifier of(int partition, InetSocketAddress location) {
- return new ReplicaIdentifier(partition, location);
+ public static ReplicaIdentifier of(int partition, String nodeId, InetSocketAddress location) {
+ return new ReplicaIdentifier(partition, nodeId, location);
}
public int getPartition() {
return partition;
}
+ public int getStoragePartition() {
+ return storagePartition;
+ }
+
public InetSocketAddress getLocation() {
return location;
}
@@ -52,6 +60,10 @@
return location;
}
+ public String getNodeId() {
+ return nodeId;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
index d803756..06650b0 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
@@ -78,7 +78,7 @@
}
public synchronized Optional<IPartitionReplica> getPartitionReplica(int partition) {
- return replicas.stream().filter(replica -> replica.getIdentifier().getPartition() == partition
+ return replicas.stream().filter(replica -> replica.getIdentifier().getStoragePartition() == partition
&& replica.getStatus() == IPartitionReplica.PartitionReplicaStatus.IN_SYNC).findAny();
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
index dd953c4..aadea22 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
@@ -101,7 +101,8 @@
private void process(IReplicationJob job) {
try {
- if (skip(job)) {
+ DatasetResourceReference resourceRef = getJobDatasetResource(job);
+ if (skip(resourceRef)) {
return;
}
synchronized (transferLock) {
@@ -109,11 +110,11 @@
return;
}
final IndexSynchronizer synchronizer = new IndexSynchronizer(job, appCtx);
- final int indexPartition = getJobPartition(job);
+ final int indexPartition = resourceRef.getPartitionNum();
for (ReplicationDestination dest : destinations) {
try {
Optional<IPartitionReplica> partitionReplica = dest.getPartitionReplica(indexPartition);
- if (!partitionReplica.isPresent()) {
+ if (partitionReplica.isEmpty()) {
continue;
}
PartitionReplica replica = (PartitionReplica) partitionReplica.get();
@@ -129,23 +130,20 @@
}
}
- private boolean skip(IReplicationJob job) {
+ private boolean skip(DatasetResourceReference indexFileRef) {
+ return indexFileRef == null || !replicationStrategy.isMatch(indexFileRef.getDatasetId());
+ }
+
+ private DatasetResourceReference getJobDatasetResource(IReplicationJob job) {
try {
- final String fileToReplicate = job.getAnyFile();
- final Optional<DatasetResourceReference> indexFileRefOpt =
- resourceRepository.getLocalResourceReference(fileToReplicate);
- if (!indexFileRefOpt.isPresent()) {
- LOGGER.warn("skipping replication of {} due to missing dataset resource reference", fileToReplicate);
- return true;
- }
- return !replicationStrategy.isMatch(indexFileRefOpt.get().getDatasetId());
+ return resourceRepository.getLocalResourceReference(job.getAnyFile()).orElse(null);
} catch (HyracksDataException e) {
throw new IllegalStateException("Couldn't find resource for " + job.getAnyFile(), e);
}
}
- private int getJobPartition(IReplicationJob job) {
- return ResourceReference.of(job.getAnyFile()).getPartitionNum();
+ private ResourceReference getJobPartition(IReplicationJob job) {
+ return ResourceReference.of(job.getAnyFile());
}
private void closeChannels() {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
index 6a23ae6..a805d82 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
@@ -128,6 +128,7 @@
public void replicate(ILogRecord logRecord) throws InterruptedException {
if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
synchronized (destinations) {
+ //TODO filter destinations
ackTracker.track(logRecord, new HashSet<>(destinations.keySet()));
}
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index 767eb76..e0f9a8f 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -59,6 +59,8 @@
PersistentLocalResourceRepository resRepo =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
final IIOManager ioManager = appCtx.getIoManager();
+ //TODO this should consider replication strategy to avoid checkpointing metadata datasets
+ // when only other datasets should be checkpointed
final Collection<LocalResource> partitionResources = resRepo.getPartitionResources(partition).values();
final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
for (LocalResource ls : partitionResources) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
index cff12de..91bc35b 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
@@ -48,11 +48,12 @@
public void perform(INcApplicationContext appCtx, IReplicationWorker worker) throws HyracksDataException {
final PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
+ //TODO do we need to sync such tasks?
localResourceRepository.cleanup(partition);
- final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
- final List<String> partitionResources =
- localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
- .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
+ //TODO need to include checksum of each file too
+ IReplicationStrategy strategy = appCtx.getReplicationManager().getReplicationStrategy();
+ final List<String> partitionResources = localResourceRepository.getPartitionReplicatedFiles(partition, strategy)
+ .stream().map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
final PartitionResourcesListResponse response =
new PartitionResourcesListResponse(partition, partitionResources);
ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer());
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
new file mode 100644
index 0000000..e73bcb2
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ReplicaPromotionMessage implements ICcAddressedMessage {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final long serialVersionUID = 1L;
+ private final String nodeId;
+ private int partition;
+
+ public ReplicaPromotionMessage(String nodeId, int partition) {
+ this.nodeId = nodeId;
+ this.partition = partition;
+ }
+
+ @Override
+ public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ LOGGER.info("Partition {} master changed, new master {}", partition, nodeId);
+ IClusterStateManager clusterStateManager = appCtx.getClusterStateManager();
+ clusterStateManager.updateClusterPartition(partition, nodeId, true);
+ clusterStateManager.refreshState();
+ }
+}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index b47fd39..a01a2ba 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -26,7 +26,6 @@
import java.util.stream.Collectors;
import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.replication.api.PartitionReplica;
import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
@@ -49,14 +48,13 @@
}
public void sync() throws IOException {
- final int partition = replica.getIdentifier().getPartition();
+ final int partition = replica.getIdentifier().getStoragePartition();
final Set<String> replicaFiles = getReplicaFiles(partition);
final PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
- final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
- final Set<String> masterFiles =
- localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
- .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
+ final Set<String> masterFiles = localResourceRepository
+ .getPartitionReplicatedFiles(partition, appCtx.getReplicationManager().getReplicationStrategy())
+ .stream().map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
// find files on master and not on replica
final List<String> replicaMissingFiles =
masterFiles.stream().filter(file -> !replicaFiles.contains(file)).collect(Collectors.toList());
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 261236c..313263e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -60,15 +60,15 @@
private void syncFiles() throws IOException {
final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica);
- // flush replicated dataset to generate disk component for any remaining in-memory components
final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
+ // flush replicated dataset to generate disk component for any remaining in-memory components
appCtx.getDatasetLifecycleManager().flushDataset(replStrategy);
waitForReplicatedDatasetsIO();
fileSync.sync();
}
private void checkpointReplicaIndexes() throws IOException {
- final int partition = replica.getIdentifier().getPartition();
+ final int partition = replica.getIdentifier().getStoragePartition();
CheckpointPartitionIndexesTask task =
new CheckpointPartitionIndexesTask(partition, getPartitionMaxComponentId(partition));
ReplicationProtocol.sendTo(replica, task);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 9099c88..2576c4f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -19,6 +19,7 @@
package org.apache.asterix.runtime.utils;
import java.io.IOException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import org.apache.asterix.common.api.IConfigValidator;
@@ -101,6 +102,8 @@
private final IConfigValidator configValidator;
private final IAdapterFactoryService adapterFactoryService;
+ private final ReentrantReadWriteLock compilationLock = new ReentrantReadWriteLock(true);
+
public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
INcLifecycleCoordinator ftStrategy, IJobLifecycleListener activeLifeCycleListener,
@@ -314,4 +317,9 @@
public IAdapterFactoryService getAdapterFactoryService() {
return adapterFactoryService;
}
+
+ @Override
+ public ReentrantReadWriteLock getCompilationLock() {
+ return compilationLock;
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 98a97b0..fc99792 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -24,12 +24,14 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.cluster.ClusterPartition;
@@ -83,6 +85,8 @@
@Override
public void setCcAppCtx(ICcApplicationContext appCtx) {
this.appCtx = appCtx;
+ //TODO we need to maintain the node2PartitionsMap in the same state before a failover and only change the
+ // a partition's current active node
node2PartitionsMap = appCtx.getMetadataProperties().getNodePartitions();
clusterPartitions = appCtx.getMetadataProperties().getClusterPartitions();
currentMetadataNode = appCtx.getMetadataProperties().getMetadataNodeName();
@@ -139,20 +143,33 @@
}
@Override
- public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters) {
+ public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters,
+ Set<Integer> nodePartitions) {
if (active) {
updateClusterCounters(nodeId, localCounters);
participantNodes.add(nodeId);
} else {
participantNodes.remove(nodeId);
}
- ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
- // if this isn't a storage node, it will not have cluster partitions
+ // ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
if (nodePartitions != null) {
- for (ClusterPartition p : nodePartitions) {
- updateClusterPartition(p.getPartitionId(), nodeId, active);
+ for (Integer partitionId : nodePartitions) {
+ updateClusterPartition(partitionId, nodeId, active);
+ }
+ } else {
+ List<ClusterPartition> nodeActivePartitions = clusterPartitions.values().stream()
+ .filter(clusterPartition -> clusterPartition.getActiveNodeId().equals(nodeId))
+ .collect(Collectors.toList());
+ for (ClusterPartition partition : nodeActivePartitions) {
+ updateClusterPartition(partition.getPartitionId(), nodeId, active);
}
}
+ // if this isn't a storage node, it will not have cluster partitions
+ // if (nodePartitions != null) {
+ // for (ClusterPartition p : nodePartitions) {
+ // updateClusterPartition(p.getPartitionId(), nodeId, active);
+ // }
+ // }
}
@Override
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 29dedf7..556ab6a 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -248,7 +248,7 @@
}
}
- private static FileReference getLocalResourceFileByName(IIOManager ioManager, String resourcePath)
+ public static FileReference getLocalResourceFileByName(IIOManager ioManager, String resourcePath)
throws HyracksDataException {
String fileName = resourcePath + File.separator + StorageConstants.METADATA_FILE_NAME;
return ioManager.resolve(fileName);
@@ -603,4 +603,8 @@
private static boolean isComponentFile(File indexDir, String fileName) {
return COMPONENT_FILES_FILTER.accept(indexDir, fileName);
}
+
+ public Path[] getStorageRoots() {
+ return storageRoots;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
index 0a0e026..c7be4ae 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
@@ -59,7 +59,7 @@
@Override
public FileReference getFileReference(IIOManager ioManager) throws HyracksDataException {
if (cached == null) {
- cached = new FileReference(ioManager.getIODevices().get(ioDevice), getPath());
+ cached = super.getFileReference(ioManager);
}
return cached;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index f54a62c..5166970 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -184,6 +184,9 @@
public synchronized void failNode(String nodeId) throws HyracksException {
NodeControllerState state = nodeRegistry.get(nodeId);
+ if (state == null) {
+ return;
+ }
Set<JobId> affectedJobIds = state.getActiveJobIds();
// Removes the node from node map.
nodeRegistry.remove(nodeId);
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12503
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I3b8de5d3354be36768e3c9da116c3c96a552a867
Gerrit-Change-Number: 12503
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Hubail <mh...@apache.org>
Gerrit-MessageType: newchange