You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu> on 2021/07/20 00:16:33 UTC
Change in asterixdb[master]: PLEASE EDIT to provide a meaningful commit message!
From Murtadha Hubail <mh...@apache.org>:
Murtadha Hubail has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12404 )
Change subject: PLEASE EDIT to provide a meaningful commit message!
......................................................................
PLEASE EDIT to provide a meaningful commit message!
The following commits from your working branch will be included:
commit 90c270d5aa339b2b8a53be2187d8e6f688e243a2
Author: Murtadha Hubail <mu...@couchbase.com>
Date: Wed Jul 14 03:53:03 2021 +0300
WIP: Allow node partitions to specified in config
Change-Id: I7997156936612bd068285710eacb5f6f8d0e0d3c
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
A asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
30 files changed, 298 insertions(+), 59 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/04/12404/1
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
index d7d5994..70b46e0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
@@ -31,6 +31,7 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.replication.ReplicationStrategyFactory;
import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.storage.ReplicaIdentifier;
import org.apache.asterix.common.storage.ResourceStorageStats;
@@ -135,6 +136,8 @@
final ObjectNode replicaJson = OBJECT_MAPPER.createObjectNode();
replicaJson.put("location", toHostPort(replica.getIdentifier().getLocation()));
replicaJson.put("status", replica.getStatus().toString());
+ replicaJson.put("nodeId", replica.getIdentifier().getNodeId());
+ replicaJson.put("strategy", replica.getIdentifier().getStrategy().getName());
replicasArray.add(replicaJson);
}
partitionJson.set("replicas", replicasArray);
@@ -167,11 +170,14 @@
final String partition = request.getParameter("partition");
final String host = request.getParameter("host");
final String port = request.getParameter("port");
- if (partition == null || host == null || port == null) {
+ final String nodeId = request.getParameter("nodeId");
+ final String strategy = request.getParameter("strategy");
+ if (partition == null || host == null || port == null || strategy == null) {
return null;
}
- final InetSocketAddress replicaAddress = new InetSocketAddress(host, Integer.valueOf(port));
- return ReplicaIdentifier.of(Integer.valueOf(partition), replicaAddress);
+ final InetSocketAddress replicaAddress = new InetSocketAddress(host, Integer.parseInt(port));
+ return ReplicaIdentifier.of(Integer.parseInt(partition), nodeId, replicaAddress,
+ ReplicationStrategyFactory.create(strategy));
}
private void processPromote(IServletRequest request, IServletResponse response) throws HyracksDataException {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 9aa433f..53d0973 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -38,7 +38,6 @@
import org.apache.asterix.common.api.IPropertiesFactory;
import org.apache.asterix.common.api.IReceptionist;
import org.apache.asterix.common.api.IReceptionistFactory;
-import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.ActiveProperties;
import org.apache.asterix.common.config.BuildProperties;
import org.apache.asterix.common.config.CompilerProperties;
@@ -225,9 +224,9 @@
new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
virtualBufferCache, indexCheckpointManagerProvider, ioManager.getIODevices().size());
final String nodeId = getServiceContext().getNodeId();
- final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
+ final String[] nodePartitions = nodeProperties.getActivePartitions();
final Set<Integer> nodePartitionsIds =
- Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet());
+ Arrays.stream(nodePartitions).map(Integer::parseInt).collect(Collectors.toSet());
replicaManager = new ReplicaManager(this, nodePartitionsIds);
isShuttingdown = false;
activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index ad70cf4..48c0a36 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -30,11 +30,13 @@
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.replication.IPartitionReplica;
import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.storage.ReplicaIdentifier;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.asterix.replication.messaging.ReplicaPromotionMessage;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.config.IApplicationConfig;
@@ -89,8 +91,10 @@
@Override
public synchronized void removeReplica(ReplicaIdentifier id) {
+ //TODO return illegal state?
if (!replicas.containsKey(id)) {
- throw new IllegalStateException("replica with id(" + id + ") does not exist");
+ return;
+ // throw new IllegalStateException("replica with id(" + id + ") does not exist");
}
PartitionReplica replica = replicas.remove(id);
appCtx.getReplicationManager().unregister(replica);
@@ -119,6 +123,17 @@
final IRecoveryManager recoveryManager = appCtx.getTransactionSubsystem().getRecoveryManager();
recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true);
partitions.add(partition);
+ // TODO should we just persist the takeover success on metakv to prevent and race between CC being active
+ // and the partition being active?
+ ReplicaPromotionMessage message =
+ new ReplicaPromotionMessage(appCtx.getTransactionSubsystem().getId(), partition);
+ INCMessageBroker messageBroker = (INCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ try {
+ messageBroker.sendMessageToPrimaryCC(message);
+ } catch (Exception e) {
+ //TODO should we persist partition takeover success in metakv?
+ LOGGER.error("failed to notify CC of replica promotion");
+ }
}
@Override
@@ -147,8 +162,9 @@
final Map<Long, LocalResource> partitionResources = resourceRepository.getPartitionResources(partition);
final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
for (LocalResource resource : partitionResources.values()) {
- datasetLifecycleManager.close(resource.getPath());
+ datasetLifecycleManager.closeIfOpen(resource.getPath());
}
+ datasetLifecycleManager.closePartition(partition);
}
private boolean isSelf(ReplicaIdentifier id) {
@@ -157,6 +173,6 @@
int port = appConfig.getInt(NCConfig.Option.REPLICATION_LISTEN_PORT);
final InetSocketAddress replicaAddress = new InetSocketAddress(host, port);
- return id.equals(ReplicaIdentifier.of(id.getPartition(), replicaAddress));
+ return id.equals(ReplicaIdentifier.of(id.getPartition(), id.getNodeId(), replicaAddress, id.getStrategy()));
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index c4e4f82..cacb15a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -26,14 +26,12 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
import org.apache.asterix.app.nc.task.CheckpointTask;
@@ -51,7 +49,6 @@
import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -102,7 +99,8 @@
@Override
public void notifyNodeFailure(String nodeId, InetSocketAddress replicaAddress) throws HyracksDataException {
pendingStartupCompletionNodes.remove(nodeId);
- clusterManager.updateNodeState(nodeId, false, null);
+ //TODO on node failure, we need to set all of it's active partitions to inactive
+ clusterManager.updateNodeState(nodeId, false, null, null);
if (nodeId.equals(metadataNodeId)) {
clusterManager.updateMetadataNode(metadataNodeId, false);
}
@@ -138,7 +136,8 @@
private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
final String nodeId = msg.getNodeId();
nodeSecretsMap.put(nodeId, msg.getSecrets());
- List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState());
+ List<INCLifecycleTask> tasks =
+ buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState(), msg.getActivePartitions());
RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
try {
messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
@@ -157,7 +156,7 @@
return;
}
if (msg.isSuccess()) {
- clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters());
+ clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters(), msg.getPartitions());
if (msg.getNodeId().equals(metadataNodeId)) {
clusterManager.updateMetadataNode(metadataNodeId, true);
}
@@ -167,7 +166,8 @@
}
}
- protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state) {
+ protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state,
+ Set<Integer> activePartitions) {
LOGGER.info("Building registration tasks for node {} with status {} and system state: {}", nodeId, nodeStatus,
state);
final boolean isMetadataNode = nodeId.equals(metadataNodeId);
@@ -175,7 +175,7 @@
case ACTIVE:
return buildActiveNCRegTasks(isMetadataNode);
case IDLE:
- return buildIdleNcRegTasks(nodeId, isMetadataNode, state);
+ return buildIdleNcRegTasks(nodeId, isMetadataNode, state, activePartitions);
default:
return new ArrayList<>();
}
@@ -210,15 +210,18 @@
}
}
- protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state) {
+ protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state,
+ Set<Integer> activePartitions) {
final List<INCLifecycleTask> tasks = new ArrayList<>();
tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING));
if (state == SystemState.CORRUPTED) {
- // need to perform local recovery for node partitions
- LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.stream(clusterManager.getNodePartitions(newNodeId))
- .map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
+ // need to perform local recovery for node active partitions
+ //TODO ensure recovery is done for all active partitions before the CheckpointTask since that
+ // might delete any old transaction log files
+ LocalRecoveryTask rt = new LocalRecoveryTask(activePartitions);
tasks.add(rt);
}
+
if (replicationEnabled) {
tasks.add(new StartReplicationServiceTask());
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
index 79fa7c8..c70b6bc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.app.replication.message;
+import java.util.Set;
+
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
import org.apache.asterix.common.replication.INCLifecycleMessage;
@@ -31,11 +33,14 @@
private final boolean success;
private Throwable exception;
private final NcLocalCounters localCounters;
+ private Set<Integer> partitions;
- public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters) {
+ public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters,
+ Set<Integer> partitions) {
this.nodeId = nodeId;
this.success = success;
this.localCounters = localCounters;
+ this.partitions = partitions;
}
@Override
@@ -63,6 +68,10 @@
return localCounters;
}
+ public Set<Integer> getPartitions() {
+ return partitions;
+ }
+
@Override
public MessageType getType() {
return MessageType.REGISTRATION_TASKS_RESULT;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
index c2cc63c..870413b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
@@ -20,6 +20,7 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
@@ -37,25 +38,27 @@
public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICcAddressedMessage {
private static final Logger LOGGER = LogManager.getLogger();
- private static final long serialVersionUID = 2L;
+ private static final long serialVersionUID = 3L;
protected final SystemState state;
protected final String nodeId;
protected final NodeStatus nodeStatus;
protected final Map<String, Object> secrets;
+ protected final Set<Integer> activePartitions;
public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state,
- Map<String, Object> secretsEphemeral) {
+ Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) {
this.state = state;
this.nodeId = nodeId;
this.nodeStatus = nodeStatus;
this.secrets = new HashMap<>(secretsEphemeral);
+ this.activePartitions = activePartitions;
}
public static void send(CcId ccId, NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState,
- Map<String, Object> secretsEphemeral) throws HyracksDataException {
+ Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) throws HyracksDataException {
try {
- RegistrationTasksRequestMessage msg =
- new RegistrationTasksRequestMessage(cs.getId(), nodeStatus, systemState, secretsEphemeral);
+ RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus,
+ systemState, secretsEphemeral, activePartitions);
((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(ccId, msg);
} catch (Exception e) {
LOGGER.log(Level.ERROR, "Unable to send RegistrationTasksRequestMessage to CC", e);
@@ -88,4 +91,8 @@
public Map<String, Object> getSecrets() {
return secrets;
}
+
+ public Set<Integer> getActivePartitions() {
+ return activePartitions;
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index 1227a6f..7848849 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -19,6 +19,7 @@
package org.apache.asterix.app.replication.message;
import java.util.List;
+import java.util.Set;
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.api.INcApplicationContext;
@@ -72,7 +73,9 @@
}
NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(),
(NodeControllerService) appCtx.getServiceContext().getControllerService()) : null;
- NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success, localCounter);
+ Set<Integer> partitions = appCtx.getReplicaManager().getPartitions();
+ NCLifecycleTaskReportMessage result =
+ new NCLifecycleTaskReportMessage(nodeId, success, localCounter, partitions);
result.setException(exception);
try {
broker.sendMessageToCC(getCcId(), result);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 3571d88..76eb184 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -42,6 +42,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.asterix.active.ActivityState;
@@ -272,6 +273,7 @@
protected final ICcApplicationContext appCtx;
protected final SessionOutput sessionOutput;
protected final SessionConfig sessionConfig;
+ protected final ReentrantReadWriteLock compilationLock;
protected Dataverse activeDataverse;
protected final List<FunctionDecl> declaredFunctions;
protected final ILangCompilationProvider compilationProvider;
@@ -290,6 +292,7 @@
this.appCtx = appCtx;
this.lockManager = appCtx.getMetadataLockManager();
this.lockUtil = appCtx.getMetadataLockUtil();
+ this.compilationLock = appCtx.getCompilationLock();
this.statements = statements;
this.sessionOutput = output;
this.sessionConfig = output.config();
@@ -3367,12 +3370,14 @@
final IMetadataLocker locker = new IMetadataLocker() {
@Override
public void lock() throws AlgebricksException {
+ compilationLock.readLock().lock();
lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
}
@Override
public void unlock() {
metadataProvider.getLocks().unlock();
+ compilationLock.readLock().unlock();
}
};
final IStatementCompiler compiler = () -> {
@@ -3962,6 +3967,7 @@
final IMetadataLocker locker = new IMetadataLocker() {
@Override
public void lock() {
+ compilationLock.readLock().lock();
}
@Override
@@ -3969,6 +3975,7 @@
metadataProvider.getLocks().unlock();
// release external datasets' locks acquired during compilation of the query
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
+ compilationLock.readLock().unlock();
}
};
final IStatementCompiler compiler = () -> {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index c148c92..89f7741 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -36,6 +36,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.asterix.algebra.base.ILangExtension;
import org.apache.asterix.api.http.server.BasicAuthServlet;
@@ -299,8 +300,10 @@
final Map httpSecrets =
apiServer != null ? Collections.singletonMap(SYS_AUTH_HEADER, apiServer.ctx().get(SYS_AUTH_HEADER))
: Collections.emptyMap();
+ Set<Integer> activePartitions = Arrays.stream(runtimeContext.getNodeProperties().getActivePartitions())
+ .map(Integer::parseInt).collect(Collectors.toSet());
RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
- currentStatus, systemState, httpSecrets);
+ currentStatus, systemState, httpSecrets, activePartitions);
}
@Override
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index 9cc295e..370b330 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -200,7 +200,8 @@
private void notifyNodeStartupCompletion(CcApplicationContext applicationContext, String nodeId)
throws HyracksDataException {
- NCLifecycleTaskReportMessage msg = new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters());
+ NCLifecycleTaskReportMessage msg =
+ new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters(), Collections.emptySet());
applicationContext.getNcLifecycleCoordinator().process(msg);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index b03af55..48bada3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -169,4 +169,8 @@
* @return the current datasets io stats
*/
StorageIOStats getDatasetsIOStats();
+
+ void closeIfOpen(String resourcePath) throws HyracksDataException;
+
+ void closePartition(int partitionId);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
index cc99421..b98cbe5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
@@ -71,7 +71,10 @@
@Override
public ClusterPartition clone() {
- return new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+ ClusterPartition clone = new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+ clone.setActiveNodeId(activeNodeId);
+ clone.setActive(active);
+ return clone;
}
@Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index a37e6e4..94f80cc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -62,9 +62,11 @@
* @param nodeId
* @param active
* @param ncLocalCounters
+ * @param partitions
* @throws HyracksDataException
*/
- void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters) throws HyracksDataException;
+ void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters, Set<Integer> partitions)
+ throws HyracksDataException;
/**
* Updates the active node and active state of the cluster partition with id {@code partitionNum}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
index aaf6316..efc9b6a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
@@ -46,7 +46,11 @@
STARTING_PARTITION_ID(
OptionTypes.INTEGER,
-1,
- "The first partition id to assign to iodevices on this node (-1 == auto-assign)");
+ "The first partition id to assign to iodevices on this node (-1 == auto-assign)"),
+ ACTIVE_PARTITIONS(
+ OptionTypes.STRING_ARRAY,
+ new String[] {},
+ "List of partitions this node is currently the master of");
private final IOptionType type;
private final Object defaultValue;
@@ -95,7 +99,7 @@
@Override
public boolean hidden() {
- return this == INITIAL_RUN || this == STARTING_PARTITION_ID;
+ return this == INITIAL_RUN || this == STARTING_PARTITION_ID || this == ACTIVE_PARTITIONS;
}
}
@@ -115,4 +119,8 @@
public String getTxnLogDir() {
return accessor.getString(Option.TXN_LOG_DIR);
}
+
+ public String[] getActivePartitions() {
+ return accessor.getStringArray(Option.ACTIVE_PARTITIONS);
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
index 3b6100c..5d1a8e4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
@@ -132,6 +132,7 @@
}
stores.put(ncId, nodeStores);
nodePartitionsMap.put(ncId, nodePartitions);
+ LOGGER.info("Node partitions {}", nodeCfg.getStringArray(NodeProperties.Option.ACTIVE_PARTITIONS));
}
private void loadAsterixBuildProperties() throws AsterixException {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index b2f4034..00a30e5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -559,6 +559,51 @@
return stats;
}
+ @Override
+ public synchronized void closeIfOpen(String resourcePath) throws HyracksDataException {
+ validateDatasetLifecycleManagerState();
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
+
+ DatasetResource dsr = datasets.get(did);
+ IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
+
+ if (dsr == null || iInfo == null) {
+ return;
+ }
+
+ PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
+ if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
+ if (LOGGER.isErrorEnabled()) {
+ final String logMsg = String.format(
+ "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
+ resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
+ LOGGER.error(logMsg);
+ }
+ throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
+ StoragePathUtil.getIndexNameFromPath(resourcePath));
+ }
+
+ // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
+ DatasetInfo dsInfo = dsr.getDatasetInfo();
+ dsInfo.waitForIO();
+ closeIndex(iInfo);
+ dsInfo.removeIndex(resourceID);
+ synchronized (dsInfo) {
+ if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
+ && !dsInfo.isExternal()) {
+ removeDatasetFromCache(dsInfo.getDatasetID());
+ }
+ }
+ }
+
+ @Override
+ public synchronized void closePartition(int partitionId) {
+ for (DatasetResource ds : datasets.values()) {
+ ds.removePartition(partitionId);
+ }
+ }
+
private void closeIndex(IndexInfo indexInfo) throws HyracksDataException {
if (indexInfo.isOpen()) {
ILSMOperationTracker opTracker = indexInfo.getIndex().getOperationTracker();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 54e1976..db9eabb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -182,4 +182,10 @@
public boolean isMetadataDataset() {
return MetadataIndexImmutableProperties.isMetadataDataset(getDatasetID());
}
+
+ public void removePartition(int partitionId) {
+ datasetPrimaryOpTrackers.remove(partitionId);
+ datasetComponentIdGenerators.remove(partitionId);
+ datasetRateLimiters.remove(partitionId);
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index ad90814..72fb32c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.common.dataflow;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.ICoordinationService;
import org.apache.asterix.common.api.IMetadataLockManager;
@@ -149,4 +151,11 @@
* @return the adapter factory service
*/
IAdapterFactoryService getAdapterFactoryService();
+
+ /**
+ * Gets the compilation lock
+ *
+ * @return the compilation lock
+ */
+ ReentrantReadWriteLock getCompilationLock();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
index c969777..1dfc2e2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
@@ -24,4 +24,9 @@
public boolean isMatch(int datasetId) {
return true;
}
+
+ @Override
+ public String getName() {
+ return "all";
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
index 0ad8be2..a74b621 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
@@ -25,4 +25,6 @@
* @return True, if the dataset should be replicated. Otherwise false.
*/
boolean isMatch(int datasetId);
+
+ String getName();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
index 2b97fe7..5278208 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
@@ -26,4 +26,9 @@
public boolean isMatch(int datasetId) {
return MetadataIndexImmutableProperties.isMetadataDataset(datasetId);
}
+
+ @Override
+ public String getName() {
+ return "metadata";
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
index 8ffaa15..49083c8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
@@ -24,4 +24,9 @@
public boolean isMatch(int datasetId) {
return false;
}
+
+ @Override
+ public String getName() {
+ return "none";
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
index c4bb74c..af285be 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
@@ -20,22 +20,28 @@
import java.net.InetSocketAddress;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.hyracks.util.NetworkUtil;
public class ReplicaIdentifier {
private final int partition;
private final String id;
+ private final IReplicationStrategy strategy;
private volatile InetSocketAddress location;
+ private final String nodeId;
- private ReplicaIdentifier(int partition, InetSocketAddress location) {
+ private ReplicaIdentifier(int partition, String nodeId, InetSocketAddress location, IReplicationStrategy strategy) {
this.partition = partition;
this.location = location;
- id = partition + "@" + location.getHostString() + ":" + location.getPort();
+ this.nodeId = nodeId;
+ this.strategy = strategy;
+ id = partition + "@" + location.getHostString() + ":" + location.getPort() + ":" + strategy.getName();
}
- public static ReplicaIdentifier of(int partition, InetSocketAddress location) {
- return new ReplicaIdentifier(partition, location);
+ public static ReplicaIdentifier of(int partition, String nodeId, InetSocketAddress location,
+ IReplicationStrategy strategy) {
+ return new ReplicaIdentifier(partition, nodeId, location, strategy);
}
public int getPartition() {
@@ -52,6 +58,14 @@
return location;
}
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public IReplicationStrategy getStrategy() {
+ return strategy;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
index dd953c4..9bf0337 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
@@ -101,7 +101,8 @@
private void process(IReplicationJob job) {
try {
- if (skip(job)) {
+ DatasetResourceReference resourceRef = getJobDatasetResource(job);
+ if (skip(resourceRef)) {
return;
}
synchronized (transferLock) {
@@ -109,15 +110,17 @@
return;
}
final IndexSynchronizer synchronizer = new IndexSynchronizer(job, appCtx);
- final int indexPartition = getJobPartition(job);
+ final int indexPartition = resourceRef.getPartitionNum();
for (ReplicationDestination dest : destinations) {
try {
Optional<IPartitionReplica> partitionReplica = dest.getPartitionReplica(indexPartition);
- if (!partitionReplica.isPresent()) {
+ if (partitionReplica.isEmpty()) {
continue;
}
PartitionReplica replica = (PartitionReplica) partitionReplica.get();
- synchronizer.sync(replica);
+ if (replica.getIdentifier().getStrategy().isMatch(resourceRef.getDatasetId())) {
+ synchronizer.sync(replica);
+ }
} catch (Exception e) {
handleFailure(dest, e);
}
@@ -129,23 +132,20 @@
}
}
- private boolean skip(IReplicationJob job) {
+ private boolean skip(DatasetResourceReference indexFileRef) {
+ return indexFileRef == null || !replicationStrategy.isMatch(indexFileRef.getDatasetId());
+ }
+
+ private DatasetResourceReference getJobDatasetResource(IReplicationJob job) {
try {
- final String fileToReplicate = job.getAnyFile();
- final Optional<DatasetResourceReference> indexFileRefOpt =
- resourceRepository.getLocalResourceReference(fileToReplicate);
- if (!indexFileRefOpt.isPresent()) {
- LOGGER.warn("skipping replication of {} due to missing dataset resource reference", fileToReplicate);
- return true;
- }
- return !replicationStrategy.isMatch(indexFileRefOpt.get().getDatasetId());
+ return resourceRepository.getLocalResourceReference(job.getAnyFile()).orElse(null);
} catch (HyracksDataException e) {
throw new IllegalStateException("Couldn't find resource for " + job.getAnyFile(), e);
}
}
- private int getJobPartition(IReplicationJob job) {
- return ResourceReference.of(job.getAnyFile()).getPartitionNum();
+ private ResourceReference getJobPartition(IReplicationJob job) {
+ return ResourceReference.of(job.getAnyFile());
}
private void closeChannels() {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
index 6a23ae6..a805d82 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
@@ -128,6 +128,7 @@
public void replicate(ILogRecord logRecord) throws InterruptedException {
if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
synchronized (destinations) {
+ //TODO filter destinations
ackTracker.track(logRecord, new HashSet<>(destinations.keySet()));
}
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
new file mode 100644
index 0000000..e73bcb2
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ReplicaPromotionMessage implements ICcAddressedMessage {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final long serialVersionUID = 1L;
+ private final String nodeId;
+ private int partition;
+
+ public ReplicaPromotionMessage(String nodeId, int partition) {
+ this.nodeId = nodeId;
+ this.partition = partition;
+ }
+
+ @Override
+ public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ LOGGER.info("Partition {} master changed, new master {}", partition, nodeId);
+ IClusterStateManager clusterStateManager = appCtx.getClusterStateManager();
+ clusterStateManager.updateClusterPartition(partition, nodeId, true);
+ clusterStateManager.refreshState();
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 9099c88..2576c4f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -19,6 +19,7 @@
package org.apache.asterix.runtime.utils;
import java.io.IOException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import org.apache.asterix.common.api.IConfigValidator;
@@ -101,6 +102,8 @@
private final IConfigValidator configValidator;
private final IAdapterFactoryService adapterFactoryService;
+ private final ReentrantReadWriteLock compilationLock = new ReentrantReadWriteLock(true);
+
public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
INcLifecycleCoordinator ftStrategy, IJobLifecycleListener activeLifeCycleListener,
@@ -314,4 +317,9 @@
public IAdapterFactoryService getAdapterFactoryService() {
return adapterFactoryService;
}
+
+ @Override
+ public ReentrantReadWriteLock getCompilationLock() {
+ return compilationLock;
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 98a97b0..fc99792 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -24,12 +24,14 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.cluster.ClusterPartition;
@@ -83,6 +85,8 @@
@Override
public void setCcAppCtx(ICcApplicationContext appCtx) {
this.appCtx = appCtx;
+ //TODO we need to maintain the node2PartitionsMap in the same state before a failover and only change the
+ // a partition's current active node
node2PartitionsMap = appCtx.getMetadataProperties().getNodePartitions();
clusterPartitions = appCtx.getMetadataProperties().getClusterPartitions();
currentMetadataNode = appCtx.getMetadataProperties().getMetadataNodeName();
@@ -139,20 +143,33 @@
}
@Override
- public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters) {
+ public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters,
+ Set<Integer> nodePartitions) {
if (active) {
updateClusterCounters(nodeId, localCounters);
participantNodes.add(nodeId);
} else {
participantNodes.remove(nodeId);
}
- ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
- // if this isn't a storage node, it will not have cluster partitions
+ // ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
if (nodePartitions != null) {
- for (ClusterPartition p : nodePartitions) {
- updateClusterPartition(p.getPartitionId(), nodeId, active);
+ for (Integer partitionId : nodePartitions) {
+ updateClusterPartition(partitionId, nodeId, active);
+ }
+ } else {
+ List<ClusterPartition> nodeActivePartitions = clusterPartitions.values().stream()
+ .filter(clusterPartition -> clusterPartition.getActiveNodeId().equals(nodeId))
+ .collect(Collectors.toList());
+ for (ClusterPartition partition : nodeActivePartitions) {
+ updateClusterPartition(partition.getPartitionId(), nodeId, active);
}
}
+ // if this isn't a storage node, it will not have cluster partitions
+ // if (nodePartitions != null) {
+ // for (ClusterPartition p : nodePartitions) {
+ // updateClusterPartition(p.getPartitionId(), nodeId, active);
+ // }
+ // }
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
index 0a0e026..c7be4ae 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
@@ -59,7 +59,7 @@
@Override
public FileReference getFileReference(IIOManager ioManager) throws HyracksDataException {
if (cached == null) {
- cached = new FileReference(ioManager.getIODevices().get(ioDevice), getPath());
+ cached = super.getFileReference(ioManager);
}
return cached;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index f54a62c..5166970 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -184,6 +184,9 @@
public synchronized void failNode(String nodeId) throws HyracksException {
NodeControllerState state = nodeRegistry.get(nodeId);
+ if (state == null) {
+ return;
+ }
Set<JobId> affectedJobIds = state.getActiveJobIds();
// Removes the node from node map.
nodeRegistry.remove(nodeId);
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12404
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I7997156936612bd068285710eacb5f6f8d0e0d3c
Gerrit-Change-Number: 12404
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Hubail <mh...@apache.org>
Gerrit-MessageType: newchange
Change in asterixdb[master]: Dynamic Partitions Topology PoC
Posted by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu>.
From Murtadha Hubail <mh...@apache.org>:
Murtadha Hubail has abandoned this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12404 )
Change subject: Dynamic Partitions Topology PoC
......................................................................
Abandoned
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12404
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I7997156936612bd068285710eacb5f6f8d0e0d3c
Gerrit-Change-Number: 12404
Gerrit-PatchSet: 3
Gerrit-Owner: Murtadha Hubail <mh...@apache.org>
Gerrit-Reviewer: Ali Alsuliman <al...@gmail.com>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Murtadha Hubail <mh...@apache.org>
Gerrit-MessageType: abandon
Change in asterixdb[master]: PLEASE EDIT to provide a meaningful commit message!
Posted by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu>.
From Murtadha Hubail <mh...@apache.org>:
Murtadha Hubail has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12404 )
Change subject: PLEASE EDIT to provide a meaningful commit message!
......................................................................
PLEASE EDIT to provide a meaningful commit message!
The following commits from your working branch will be included:
commit 90c270d5aa339b2b8a53be2187d8e6f688e243a2
Author: Murtadha Hubail <mu...@couchbase.com>
Date: Wed Jul 14 03:53:03 2021 +0300
WIP: Allow node partitions to specified in config
Change-Id: I7997156936612bd068285710eacb5f6f8d0e0d3c
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
A asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
30 files changed, 298 insertions(+), 59 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/04/12404/1
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
index d7d5994..70b46e0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
@@ -31,6 +31,7 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.replication.ReplicationStrategyFactory;
import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.storage.ReplicaIdentifier;
import org.apache.asterix.common.storage.ResourceStorageStats;
@@ -135,6 +136,8 @@
final ObjectNode replicaJson = OBJECT_MAPPER.createObjectNode();
replicaJson.put("location", toHostPort(replica.getIdentifier().getLocation()));
replicaJson.put("status", replica.getStatus().toString());
+ replicaJson.put("nodeId", replica.getIdentifier().getNodeId());
+ replicaJson.put("strategy", replica.getIdentifier().getStrategy().getName());
replicasArray.add(replicaJson);
}
partitionJson.set("replicas", replicasArray);
@@ -167,11 +170,14 @@
final String partition = request.getParameter("partition");
final String host = request.getParameter("host");
final String port = request.getParameter("port");
- if (partition == null || host == null || port == null) {
+ final String nodeId = request.getParameter("nodeId");
+ final String strategy = request.getParameter("strategy");
+ if (partition == null || host == null || port == null || strategy == null) {
return null;
}
- final InetSocketAddress replicaAddress = new InetSocketAddress(host, Integer.valueOf(port));
- return ReplicaIdentifier.of(Integer.valueOf(partition), replicaAddress);
+ final InetSocketAddress replicaAddress = new InetSocketAddress(host, Integer.parseInt(port));
+ return ReplicaIdentifier.of(Integer.parseInt(partition), nodeId, replicaAddress,
+ ReplicationStrategyFactory.create(strategy));
}
private void processPromote(IServletRequest request, IServletResponse response) throws HyracksDataException {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 9aa433f..53d0973 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -38,7 +38,6 @@
import org.apache.asterix.common.api.IPropertiesFactory;
import org.apache.asterix.common.api.IReceptionist;
import org.apache.asterix.common.api.IReceptionistFactory;
-import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.ActiveProperties;
import org.apache.asterix.common.config.BuildProperties;
import org.apache.asterix.common.config.CompilerProperties;
@@ -225,9 +224,9 @@
new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
virtualBufferCache, indexCheckpointManagerProvider, ioManager.getIODevices().size());
final String nodeId = getServiceContext().getNodeId();
- final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
+ final String[] nodePartitions = nodeProperties.getActivePartitions();
final Set<Integer> nodePartitionsIds =
- Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet());
+ Arrays.stream(nodePartitions).map(Integer::parseInt).collect(Collectors.toSet());
replicaManager = new ReplicaManager(this, nodePartitionsIds);
isShuttingdown = false;
activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index ad70cf4..48c0a36 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -30,11 +30,13 @@
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.replication.IPartitionReplica;
import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.storage.ReplicaIdentifier;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.asterix.replication.messaging.ReplicaPromotionMessage;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.config.IApplicationConfig;
@@ -89,8 +91,10 @@
@Override
public synchronized void removeReplica(ReplicaIdentifier id) {
+ //TODO return illegal state?
if (!replicas.containsKey(id)) {
- throw new IllegalStateException("replica with id(" + id + ") does not exist");
+ return;
+ // throw new IllegalStateException("replica with id(" + id + ") does not exist");
}
PartitionReplica replica = replicas.remove(id);
appCtx.getReplicationManager().unregister(replica);
@@ -119,6 +123,17 @@
final IRecoveryManager recoveryManager = appCtx.getTransactionSubsystem().getRecoveryManager();
recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true);
partitions.add(partition);
+ // TODO should we just persist the takeover success on metakv to prevent and race between CC being active
+ // and the partition being active?
+ ReplicaPromotionMessage message =
+ new ReplicaPromotionMessage(appCtx.getTransactionSubsystem().getId(), partition);
+ INCMessageBroker messageBroker = (INCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ try {
+ messageBroker.sendMessageToPrimaryCC(message);
+ } catch (Exception e) {
+ //TODO should we persist partition takeover success in metakv?
+ LOGGER.error("failed to notify CC of replica promotion");
+ }
}
@Override
@@ -147,8 +162,9 @@
final Map<Long, LocalResource> partitionResources = resourceRepository.getPartitionResources(partition);
final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
for (LocalResource resource : partitionResources.values()) {
- datasetLifecycleManager.close(resource.getPath());
+ datasetLifecycleManager.closeIfOpen(resource.getPath());
}
+ datasetLifecycleManager.closePartition(partition);
}
private boolean isSelf(ReplicaIdentifier id) {
@@ -157,6 +173,6 @@
int port = appConfig.getInt(NCConfig.Option.REPLICATION_LISTEN_PORT);
final InetSocketAddress replicaAddress = new InetSocketAddress(host, port);
- return id.equals(ReplicaIdentifier.of(id.getPartition(), replicaAddress));
+ return id.equals(ReplicaIdentifier.of(id.getPartition(), id.getNodeId(), replicaAddress, id.getStrategy()));
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index c4e4f82..cacb15a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -26,14 +26,12 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
import org.apache.asterix.app.nc.task.CheckpointTask;
@@ -51,7 +49,6 @@
import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -102,7 +99,8 @@
@Override
public void notifyNodeFailure(String nodeId, InetSocketAddress replicaAddress) throws HyracksDataException {
pendingStartupCompletionNodes.remove(nodeId);
- clusterManager.updateNodeState(nodeId, false, null);
+ //TODO on node failure, we need to set all of it's active partitions to inactive
+ clusterManager.updateNodeState(nodeId, false, null, null);
if (nodeId.equals(metadataNodeId)) {
clusterManager.updateMetadataNode(metadataNodeId, false);
}
@@ -138,7 +136,8 @@
private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
final String nodeId = msg.getNodeId();
nodeSecretsMap.put(nodeId, msg.getSecrets());
- List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState());
+ List<INCLifecycleTask> tasks =
+ buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState(), msg.getActivePartitions());
RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
try {
messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
@@ -157,7 +156,7 @@
return;
}
if (msg.isSuccess()) {
- clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters());
+ clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters(), msg.getPartitions());
if (msg.getNodeId().equals(metadataNodeId)) {
clusterManager.updateMetadataNode(metadataNodeId, true);
}
@@ -167,7 +166,8 @@
}
}
- protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state) {
+ protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state,
+ Set<Integer> activePartitions) {
LOGGER.info("Building registration tasks for node {} with status {} and system state: {}", nodeId, nodeStatus,
state);
final boolean isMetadataNode = nodeId.equals(metadataNodeId);
@@ -175,7 +175,7 @@
case ACTIVE:
return buildActiveNCRegTasks(isMetadataNode);
case IDLE:
- return buildIdleNcRegTasks(nodeId, isMetadataNode, state);
+ return buildIdleNcRegTasks(nodeId, isMetadataNode, state, activePartitions);
default:
return new ArrayList<>();
}
@@ -210,15 +210,18 @@
}
}
- protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state) {
+ protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state,
+ Set<Integer> activePartitions) {
final List<INCLifecycleTask> tasks = new ArrayList<>();
tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING));
if (state == SystemState.CORRUPTED) {
- // need to perform local recovery for node partitions
- LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.stream(clusterManager.getNodePartitions(newNodeId))
- .map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
+ // need to perform local recovery for node active partitions
+ //TODO ensure recovery is done for all active partitions before the CheckpointTask since that
+ // might delete any old transaction log files
+ LocalRecoveryTask rt = new LocalRecoveryTask(activePartitions);
tasks.add(rt);
}
+
if (replicationEnabled) {
tasks.add(new StartReplicationServiceTask());
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
index 79fa7c8..c70b6bc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.app.replication.message;
+import java.util.Set;
+
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
import org.apache.asterix.common.replication.INCLifecycleMessage;
@@ -31,11 +33,14 @@
private final boolean success;
private Throwable exception;
private final NcLocalCounters localCounters;
+ private Set<Integer> partitions;
- public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters) {
+ public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters,
+ Set<Integer> partitions) {
this.nodeId = nodeId;
this.success = success;
this.localCounters = localCounters;
+ this.partitions = partitions;
}
@Override
@@ -63,6 +68,10 @@
return localCounters;
}
+ public Set<Integer> getPartitions() {
+ return partitions;
+ }
+
@Override
public MessageType getType() {
return MessageType.REGISTRATION_TASKS_RESULT;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
index c2cc63c..870413b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
@@ -20,6 +20,7 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
@@ -37,25 +38,27 @@
public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICcAddressedMessage {
private static final Logger LOGGER = LogManager.getLogger();
- private static final long serialVersionUID = 2L;
+ private static final long serialVersionUID = 3L;
protected final SystemState state;
protected final String nodeId;
protected final NodeStatus nodeStatus;
protected final Map<String, Object> secrets;
+ protected final Set<Integer> activePartitions;
public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state,
- Map<String, Object> secretsEphemeral) {
+ Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) {
this.state = state;
this.nodeId = nodeId;
this.nodeStatus = nodeStatus;
this.secrets = new HashMap<>(secretsEphemeral);
+ this.activePartitions = activePartitions;
}
public static void send(CcId ccId, NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState,
- Map<String, Object> secretsEphemeral) throws HyracksDataException {
+ Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) throws HyracksDataException {
try {
- RegistrationTasksRequestMessage msg =
- new RegistrationTasksRequestMessage(cs.getId(), nodeStatus, systemState, secretsEphemeral);
+ RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus,
+ systemState, secretsEphemeral, activePartitions);
((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(ccId, msg);
} catch (Exception e) {
LOGGER.log(Level.ERROR, "Unable to send RegistrationTasksRequestMessage to CC", e);
@@ -88,4 +91,8 @@
public Map<String, Object> getSecrets() {
return secrets;
}
+
+ public Set<Integer> getActivePartitions() {
+ return activePartitions;
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index 1227a6f..7848849 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -19,6 +19,7 @@
package org.apache.asterix.app.replication.message;
import java.util.List;
+import java.util.Set;
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.api.INcApplicationContext;
@@ -72,7 +73,9 @@
}
NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(),
(NodeControllerService) appCtx.getServiceContext().getControllerService()) : null;
- NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success, localCounter);
+ Set<Integer> partitions = appCtx.getReplicaManager().getPartitions();
+ NCLifecycleTaskReportMessage result =
+ new NCLifecycleTaskReportMessage(nodeId, success, localCounter, partitions);
result.setException(exception);
try {
broker.sendMessageToCC(getCcId(), result);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 3571d88..76eb184 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -42,6 +42,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.asterix.active.ActivityState;
@@ -272,6 +273,7 @@
protected final ICcApplicationContext appCtx;
protected final SessionOutput sessionOutput;
protected final SessionConfig sessionConfig;
+ protected final ReentrantReadWriteLock compilationLock;
protected Dataverse activeDataverse;
protected final List<FunctionDecl> declaredFunctions;
protected final ILangCompilationProvider compilationProvider;
@@ -290,6 +292,7 @@
this.appCtx = appCtx;
this.lockManager = appCtx.getMetadataLockManager();
this.lockUtil = appCtx.getMetadataLockUtil();
+ this.compilationLock = appCtx.getCompilationLock();
this.statements = statements;
this.sessionOutput = output;
this.sessionConfig = output.config();
@@ -3367,12 +3370,14 @@
final IMetadataLocker locker = new IMetadataLocker() {
@Override
public void lock() throws AlgebricksException {
+ compilationLock.readLock().lock();
lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
}
@Override
public void unlock() {
metadataProvider.getLocks().unlock();
+ compilationLock.readLock().unlock();
}
};
final IStatementCompiler compiler = () -> {
@@ -3962,6 +3967,7 @@
final IMetadataLocker locker = new IMetadataLocker() {
@Override
public void lock() {
+ compilationLock.readLock().lock();
}
@Override
@@ -3969,6 +3975,7 @@
metadataProvider.getLocks().unlock();
// release external datasets' locks acquired during compilation of the query
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
+ compilationLock.readLock().unlock();
}
};
final IStatementCompiler compiler = () -> {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index c148c92..89f7741 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -36,6 +36,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.asterix.algebra.base.ILangExtension;
import org.apache.asterix.api.http.server.BasicAuthServlet;
@@ -299,8 +300,10 @@
final Map httpSecrets =
apiServer != null ? Collections.singletonMap(SYS_AUTH_HEADER, apiServer.ctx().get(SYS_AUTH_HEADER))
: Collections.emptyMap();
+ Set<Integer> activePartitions = Arrays.stream(runtimeContext.getNodeProperties().getActivePartitions())
+ .map(Integer::parseInt).collect(Collectors.toSet());
RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
- currentStatus, systemState, httpSecrets);
+ currentStatus, systemState, httpSecrets, activePartitions);
}
@Override
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index 9cc295e..370b330 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -200,7 +200,8 @@
private void notifyNodeStartupCompletion(CcApplicationContext applicationContext, String nodeId)
throws HyracksDataException {
- NCLifecycleTaskReportMessage msg = new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters());
+ NCLifecycleTaskReportMessage msg =
+ new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters(), Collections.emptySet());
applicationContext.getNcLifecycleCoordinator().process(msg);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index b03af55..48bada3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -169,4 +169,8 @@
* @return the current datasets io stats
*/
StorageIOStats getDatasetsIOStats();
+
+ void closeIfOpen(String resourcePath) throws HyracksDataException;
+
+ void closePartition(int partitionId);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
index cc99421..b98cbe5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
@@ -71,7 +71,10 @@
@Override
public ClusterPartition clone() {
- return new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+ ClusterPartition clone = new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+ clone.setActiveNodeId(activeNodeId);
+ clone.setActive(active);
+ return clone;
}
@Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index a37e6e4..94f80cc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -62,9 +62,11 @@
* @param nodeId
* @param active
* @param ncLocalCounters
+ * @param partitions
* @throws HyracksDataException
*/
- void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters) throws HyracksDataException;
+ void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters, Set<Integer> partitions)
+ throws HyracksDataException;
/**
* Updates the active node and active state of the cluster partition with id {@code partitionNum}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
index aaf6316..efc9b6a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
@@ -46,7 +46,11 @@
STARTING_PARTITION_ID(
OptionTypes.INTEGER,
-1,
- "The first partition id to assign to iodevices on this node (-1 == auto-assign)");
+ "The first partition id to assign to iodevices on this node (-1 == auto-assign)"),
+ ACTIVE_PARTITIONS(
+ OptionTypes.STRING_ARRAY,
+ new String[] {},
+ "List of partitions this node is currently the master of");
private final IOptionType type;
private final Object defaultValue;
@@ -95,7 +99,7 @@
@Override
public boolean hidden() {
- return this == INITIAL_RUN || this == STARTING_PARTITION_ID;
+ return this == INITIAL_RUN || this == STARTING_PARTITION_ID || this == ACTIVE_PARTITIONS;
}
}
@@ -115,4 +119,8 @@
public String getTxnLogDir() {
return accessor.getString(Option.TXN_LOG_DIR);
}
+
+ public String[] getActivePartitions() {
+ return accessor.getStringArray(Option.ACTIVE_PARTITIONS);
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
index 3b6100c..5d1a8e4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
@@ -132,6 +132,7 @@
}
stores.put(ncId, nodeStores);
nodePartitionsMap.put(ncId, nodePartitions);
+ LOGGER.info("Node partitions {}", nodeCfg.getStringArray(NodeProperties.Option.ACTIVE_PARTITIONS));
}
private void loadAsterixBuildProperties() throws AsterixException {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index b2f4034..00a30e5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -559,6 +559,51 @@
return stats;
}
+ @Override
+ public synchronized void closeIfOpen(String resourcePath) throws HyracksDataException {
+ validateDatasetLifecycleManagerState();
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
+
+ DatasetResource dsr = datasets.get(did);
+ IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
+
+ if (dsr == null || iInfo == null) {
+ return;
+ }
+
+ PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
+ if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
+ if (LOGGER.isErrorEnabled()) {
+ final String logMsg = String.format(
+ "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
+ resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
+ LOGGER.error(logMsg);
+ }
+ throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
+ StoragePathUtil.getIndexNameFromPath(resourcePath));
+ }
+
+ // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
+ DatasetInfo dsInfo = dsr.getDatasetInfo();
+ dsInfo.waitForIO();
+ closeIndex(iInfo);
+ dsInfo.removeIndex(resourceID);
+ synchronized (dsInfo) {
+ if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
+ && !dsInfo.isExternal()) {
+ removeDatasetFromCache(dsInfo.getDatasetID());
+ }
+ }
+ }
+
+ @Override
+ public synchronized void closePartition(int partitionId) {
+ for (DatasetResource ds : datasets.values()) {
+ ds.removePartition(partitionId);
+ }
+ }
+
private void closeIndex(IndexInfo indexInfo) throws HyracksDataException {
if (indexInfo.isOpen()) {
ILSMOperationTracker opTracker = indexInfo.getIndex().getOperationTracker();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 54e1976..db9eabb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -182,4 +182,10 @@
public boolean isMetadataDataset() {
return MetadataIndexImmutableProperties.isMetadataDataset(getDatasetID());
}
+
+ public void removePartition(int partitionId) {
+ datasetPrimaryOpTrackers.remove(partitionId);
+ datasetComponentIdGenerators.remove(partitionId);
+ datasetRateLimiters.remove(partitionId);
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index ad90814..72fb32c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.common.dataflow;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.ICoordinationService;
import org.apache.asterix.common.api.IMetadataLockManager;
@@ -149,4 +151,11 @@
* @return the adapter factory service
*/
IAdapterFactoryService getAdapterFactoryService();
+
+ /**
+ * Gets the compilation lock
+ *
+ * @return the compilation lock
+ */
+ ReentrantReadWriteLock getCompilationLock();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
index c969777..1dfc2e2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
@@ -24,4 +24,9 @@
public boolean isMatch(int datasetId) {
return true;
}
+
+ @Override
+ public String getName() {
+ return "all";
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
index 0ad8be2..a74b621 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
@@ -25,4 +25,6 @@
* @return True, if the dataset should be replicated. Otherwise false.
*/
boolean isMatch(int datasetId);
+
+ String getName();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
index 2b97fe7..5278208 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
@@ -26,4 +26,9 @@
public boolean isMatch(int datasetId) {
return MetadataIndexImmutableProperties.isMetadataDataset(datasetId);
}
+
+ @Override
+ public String getName() {
+ return "metadata";
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
index 8ffaa15..49083c8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
@@ -24,4 +24,9 @@
public boolean isMatch(int datasetId) {
return false;
}
+
+ @Override
+ public String getName() {
+ return "none";
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
index c4bb74c..af285be 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ReplicaIdentifier.java
@@ -20,22 +20,28 @@
import java.net.InetSocketAddress;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.hyracks.util.NetworkUtil;
public class ReplicaIdentifier {
private final int partition;
private final String id;
+ private final IReplicationStrategy strategy;
private volatile InetSocketAddress location;
+ private final String nodeId;
- private ReplicaIdentifier(int partition, InetSocketAddress location) {
+ private ReplicaIdentifier(int partition, String nodeId, InetSocketAddress location, IReplicationStrategy strategy) {
this.partition = partition;
this.location = location;
- id = partition + "@" + location.getHostString() + ":" + location.getPort();
+ this.nodeId = nodeId;
+ this.strategy = strategy;
+ id = partition + "@" + location.getHostString() + ":" + location.getPort() + ":" + strategy.getName();
}
- public static ReplicaIdentifier of(int partition, InetSocketAddress location) {
- return new ReplicaIdentifier(partition, location);
+ public static ReplicaIdentifier of(int partition, String nodeId, InetSocketAddress location,
+ IReplicationStrategy strategy) {
+ return new ReplicaIdentifier(partition, nodeId, location, strategy);
}
public int getPartition() {
@@ -52,6 +58,14 @@
return location;
}
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public IReplicationStrategy getStrategy() {
+ return strategy;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
index dd953c4..9bf0337 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
@@ -101,7 +101,8 @@
private void process(IReplicationJob job) {
try {
- if (skip(job)) {
+ DatasetResourceReference resourceRef = getJobDatasetResource(job);
+ if (skip(resourceRef)) {
return;
}
synchronized (transferLock) {
@@ -109,15 +110,17 @@
return;
}
final IndexSynchronizer synchronizer = new IndexSynchronizer(job, appCtx);
- final int indexPartition = getJobPartition(job);
+ final int indexPartition = resourceRef.getPartitionNum();
for (ReplicationDestination dest : destinations) {
try {
Optional<IPartitionReplica> partitionReplica = dest.getPartitionReplica(indexPartition);
- if (!partitionReplica.isPresent()) {
+ if (partitionReplica.isEmpty()) {
continue;
}
PartitionReplica replica = (PartitionReplica) partitionReplica.get();
- synchronizer.sync(replica);
+ if (replica.getIdentifier().getStrategy().isMatch(resourceRef.getDatasetId())) {
+ synchronizer.sync(replica);
+ }
} catch (Exception e) {
handleFailure(dest, e);
}
@@ -129,23 +132,20 @@
}
}
- private boolean skip(IReplicationJob job) {
+ private boolean skip(DatasetResourceReference indexFileRef) {
+ return indexFileRef == null || !replicationStrategy.isMatch(indexFileRef.getDatasetId());
+ }
+
+ private DatasetResourceReference getJobDatasetResource(IReplicationJob job) {
try {
- final String fileToReplicate = job.getAnyFile();
- final Optional<DatasetResourceReference> indexFileRefOpt =
- resourceRepository.getLocalResourceReference(fileToReplicate);
- if (!indexFileRefOpt.isPresent()) {
- LOGGER.warn("skipping replication of {} due to missing dataset resource reference", fileToReplicate);
- return true;
- }
- return !replicationStrategy.isMatch(indexFileRefOpt.get().getDatasetId());
+ return resourceRepository.getLocalResourceReference(job.getAnyFile()).orElse(null);
} catch (HyracksDataException e) {
throw new IllegalStateException("Couldn't find resource for " + job.getAnyFile(), e);
}
}
- private int getJobPartition(IReplicationJob job) {
- return ResourceReference.of(job.getAnyFile()).getPartitionNum();
+ private ResourceReference getJobPartition(IReplicationJob job) {
+ return ResourceReference.of(job.getAnyFile());
}
private void closeChannels() {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
index 6a23ae6..a805d82 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
@@ -128,6 +128,7 @@
public void replicate(ILogRecord logRecord) throws InterruptedException {
if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
synchronized (destinations) {
+ //TODO filter destinations
ackTracker.track(logRecord, new HashSet<>(destinations.keySet()));
}
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
new file mode 100644
index 0000000..e73bcb2
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaPromotionMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ReplicaPromotionMessage implements ICcAddressedMessage {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final long serialVersionUID = 1L;
+ private final String nodeId;
+ private int partition;
+
+ public ReplicaPromotionMessage(String nodeId, int partition) {
+ this.nodeId = nodeId;
+ this.partition = partition;
+ }
+
+ @Override
+ public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ LOGGER.info("Partition {} master changed, new master {}", partition, nodeId);
+ IClusterStateManager clusterStateManager = appCtx.getClusterStateManager();
+ clusterStateManager.updateClusterPartition(partition, nodeId, true);
+ clusterStateManager.refreshState();
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 9099c88..2576c4f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -19,6 +19,7 @@
package org.apache.asterix.runtime.utils;
import java.io.IOException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import org.apache.asterix.common.api.IConfigValidator;
@@ -101,6 +102,8 @@
private final IConfigValidator configValidator;
private final IAdapterFactoryService adapterFactoryService;
+ private final ReentrantReadWriteLock compilationLock = new ReentrantReadWriteLock(true);
+
public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
INcLifecycleCoordinator ftStrategy, IJobLifecycleListener activeLifeCycleListener,
@@ -314,4 +317,9 @@
public IAdapterFactoryService getAdapterFactoryService() {
return adapterFactoryService;
}
+
+ @Override
+ public ReentrantReadWriteLock getCompilationLock() {
+ return compilationLock;
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 98a97b0..fc99792 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -24,12 +24,14 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.cluster.ClusterPartition;
@@ -83,6 +85,8 @@
@Override
public void setCcAppCtx(ICcApplicationContext appCtx) {
this.appCtx = appCtx;
+ //TODO we need to maintain the node2PartitionsMap in the same state before a failover and only change the
+ // a partition's current active node
node2PartitionsMap = appCtx.getMetadataProperties().getNodePartitions();
clusterPartitions = appCtx.getMetadataProperties().getClusterPartitions();
currentMetadataNode = appCtx.getMetadataProperties().getMetadataNodeName();
@@ -139,20 +143,33 @@
}
@Override
- public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters) {
+ public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters,
+ Set<Integer> nodePartitions) {
if (active) {
updateClusterCounters(nodeId, localCounters);
participantNodes.add(nodeId);
} else {
participantNodes.remove(nodeId);
}
- ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
- // if this isn't a storage node, it will not have cluster partitions
+ // ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
if (nodePartitions != null) {
- for (ClusterPartition p : nodePartitions) {
- updateClusterPartition(p.getPartitionId(), nodeId, active);
+ for (Integer partitionId : nodePartitions) {
+ updateClusterPartition(partitionId, nodeId, active);
+ }
+ } else {
+ List<ClusterPartition> nodeActivePartitions = clusterPartitions.values().stream()
+ .filter(clusterPartition -> clusterPartition.getActiveNodeId().equals(nodeId))
+ .collect(Collectors.toList());
+ for (ClusterPartition partition : nodeActivePartitions) {
+ updateClusterPartition(partition.getPartitionId(), nodeId, active);
}
}
+ // if this isn't a storage node, it will not have cluster partitions
+ // if (nodePartitions != null) {
+ // for (ClusterPartition p : nodePartitions) {
+ // updateClusterPartition(p.getPartitionId(), nodeId, active);
+ // }
+ // }
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
index 0a0e026..c7be4ae 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/MappedFileSplit.java
@@ -59,7 +59,7 @@
@Override
public FileReference getFileReference(IIOManager ioManager) throws HyracksDataException {
if (cached == null) {
- cached = new FileReference(ioManager.getIODevices().get(ioDevice), getPath());
+ cached = super.getFileReference(ioManager);
}
return cached;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index f54a62c..5166970 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -184,6 +184,9 @@
public synchronized void failNode(String nodeId) throws HyracksException {
NodeControllerState state = nodeRegistry.get(nodeId);
+ if (state == null) {
+ return;
+ }
Set<JobId> affectedJobIds = state.getActiveJobIds();
// Removes the node from node map.
nodeRegistry.remove(nodeId);
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12404
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I7997156936612bd068285710eacb5f6f8d0e0d3c
Gerrit-Change-Number: 12404
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Hubail <mh...@apache.org>
Gerrit-MessageType: newchange
Change in asterixdb[master]: PLEASE EDIT to provide a meaningful commit message!
Posted by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu>.
Anon. E. Moose #1000171 has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12404 )
Change subject: PLEASE EDIT to provide a meaningful commit message!
......................................................................
Patch Set 1: Contrib-2
Analytics Compatibility Compilation Failed
https://cbjenkins.page.link/c23YKBVJTLRMNNwE6 : UNSTABLE
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12404
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I7997156936612bd068285710eacb5f6f8d0e0d3c
Gerrit-Change-Number: 12404
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Hubail <mh...@apache.org>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-CC: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Tue, 20 Jul 2021 00:33:31 +0000
Gerrit-HasComments: No
Gerrit-Has-Labels: Yes
Gerrit-MessageType: comment
Change in asterixdb[master]: PLEASE EDIT to provide a meaningful commit message!
Posted by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu>.
From Jenkins <je...@fulliautomatix.ics.uci.edu>:
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12404 )
Change subject: PLEASE EDIT to provide a meaningful commit message!
......................................................................
Patch Set 1: Integration-Tests-1
Integration Tests Failed
https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-integration-tests/12240/ : UNSTABLE
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12404
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I7997156936612bd068285710eacb5f6f8d0e0d3c
Gerrit-Change-Number: 12404
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Hubail <mh...@apache.org>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Tue, 20 Jul 2021 01:48:36 +0000
Gerrit-HasComments: No
Gerrit-Has-Labels: Yes
Gerrit-MessageType: comment