You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2016/01/27 00:31:27 UTC
[4/4] incubator-asterixdb git commit: Asterix NCs Fault Tolerance
Asterix NCs Fault Tolerance
This change includes the following:
- Adapt replication to unique partitions storage.
- Implement auto failover for failing NCs.
- Implement auto failover for metadata node.
- Fix for ASTERIXDB-1251 using proper error message.
- Basic replication test cases using vagrant virtual cluster for:
1. LSM bulkload components replication.
2. LSM Memory components replication and recovery.
3. Metadata node takeover.
These test cases will be part of the cluster test profile.
Change-Id: Ice26d980912a315fcb3efdd571d6ce88717cfea4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/573
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>
Reviewed-by: abdullah alamoudi <ba...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/8fc8bf8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/8fc8bf8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/8fc8bf8b
Branch: refs/heads/master
Commit: 8fc8bf8b510bdc635f949f2eebf8b4d0d2a6b008
Parents: 5b068d2
Author: Murtadha Hubail <mh...@uci.edu>
Authored: Sat Jan 23 22:26:59 2016 -0800
Committer: Murtadha Hubail <hu...@gmail.com>
Committed: Tue Jan 26 15:26:32 2016 -0800
----------------------------------------------------------------------
.../api/common/AsterixAppRuntimeContext.java | 59 +++-
.../bootstrap/AsterixGlobalRecoveryManager.java | 217 ------------
.../bootstrap/CCApplicationEntryPoint.java | 12 +-
.../bootstrap/ClusterLifecycleListener.java | 18 +-
.../bootstrap/GlobalRecoveryManager.java | 225 ++++++++++++
.../bootstrap/NCApplicationEntryPoint.java | 36 +-
.../asterix/messaging/CCMessageBroker.java | 26 +-
.../asterix/messaging/NCMessageBroker.java | 33 ++
.../common/api/IAsterixAppRuntimeContext.java | 14 +
.../common/cluster/IGlobalRecoveryMaanger.java | 29 ++
.../config/AsterixMetadataProperties.java | 4 +
.../config/AsterixReplicationProperties.java | 13 +-
.../IAsterixApplicationContextInfo.java | 3 +
.../TakeoverMetadataNodeRequestMessage.java | 29 ++
.../TakeoverMetadataNodeResponseMessage.java | 38 ++
.../TakeoverPartitionsRequestMessage.java | 72 ++++
.../TakeoverPartitionsResponseMessage.java | 50 +++
.../messaging/api/IApplicationMessage.java | 6 +-
.../common/messaging/api/ICCMessageBroker.java | 32 ++
.../replication/IRemoteRecoveryManager.java | 16 +
.../replication/IReplicaResourcesManager.java | 13 +-
.../common/transactions/IRecoveryManager.java | 10 +
.../asterix/common/utils/StoragePathUtil.java | 10 +-
.../src/main/resources/schema/cluster.xsd | 4 +-
.../apache/asterix/test/aql/TestExecutor.java | 27 ++
.../asterix/event/util/PatternCreator.java | 29 --
asterix-installer/pom.xml | 4 +
.../installer/command/ValidateCommand.java | 6 -
.../asterix/installer/test/ReplicationIT.java | 257 ++++++++++++++
.../clusterts/cluster_with_replication.xml | 63 ++++
.../src/test/resources/clusterts/known_hosts | 10 +-
.../failover/bulkload/bulkload.1.ddl.aql | 54 +++
.../failover/bulkload/bulkload.2.update.aql | 32 ++
.../failover/bulkload/bulkload.3.txnqbc.aql | 31 ++
.../bulkload/bulkload.4.vagrant_script.aql | 1 +
.../failover/bulkload/bulkload.5.sleep.aql | 1 +
.../failover/bulkload/bulkload.6.txnqar.aql | 31 ++
.../mem_component_recovery.1.ddl.aql | 58 ++++
.../mem_component_recovery.2.update.aql | 34 ++
.../mem_component_recovery.3.txnqbc.aql | 32 ++
.../mem_component_recovery.4.vagrant_script.aql | 1 +
.../mem_component_recovery.5.sleep.aql | 1 +
.../mem_component_recovery.6.txnqar.aql | 32 ++
.../metadata_node/metadata_node.1.ddl.aql | 55 +++
.../metadata_node/metadata_node.2.txnqbc.aql | 30 ++
.../metadata_node.3.vagrant_script.aql | 1 +
.../metadata_node/metadata_node.4.sleep.aql | 1 +
.../metadata_node/metadata_node.5.txnqar.aql | 32 ++
.../integrationts/replication/testsuite.xml | 37 ++
.../metadata/bootstrap/MetadataBootstrap.java | 6 +
.../utils/SplitsAndConstraintsUtil.java | 6 +
.../asterix/om/util/AsterixAppContextInfo.java | 19 +-
.../om/util/AsterixClusterProperties.java | 189 +++++++++-
.../functions/AsterixReplicationProtocol.java | 346 -------------------
.../functions/ReplicationProtocol.java | 346 +++++++++++++++++++
.../logging/ReplicationLogBuffer.java | 4 +-
.../management/ReplicaEventNotifier.java | 6 +-
.../management/ReplicaStateChecker.java | 4 +-
.../management/ReplicationChannel.java | 123 +++----
.../management/ReplicationManager.java | 92 +++--
.../recovery/RemoteRecoveryManager.java | 31 +-
.../replication/storage/AsterixFilesUtil.java | 78 -----
.../storage/AsterixLSMIndexFileProperties.java | 191 ----------
.../storage/LSMComponentProperties.java | 33 +-
.../storage/LSMIndexFileProperties.java | 162 +++++++++
.../storage/ReplicaResourcesManager.java | 331 ++++++++----------
.../src/test/resources/data/fbu.adm | 10 +
.../test/resources/scripts/delete_storage.sh | 18 +
.../test/resources/scripts/kill_cc_and_nc.sh | 18 +
.../PersistentLocalResourceRepository.java | 73 ++--
...ersistentLocalResourceRepositoryFactory.java | 8 +-
.../service/recovery/RecoveryManager.java | 273 +++++++++++++--
72 files changed, 2814 insertions(+), 1382 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 6d0b321..8a40876 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -19,7 +19,10 @@
package org.apache.asterix.api.common;
import java.io.IOException;
+import java.rmi.RemoteException;
+import java.rmi.server.UnicastRemoteObject;
import java.util.List;
+import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.api.AsterixThreadExecutor;
@@ -46,9 +49,14 @@ import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.external.feed.api.IFeedManager;
import org.apache.asterix.external.feed.management.FeedManager;
-import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.IAsterixStateProxy;
+import org.apache.asterix.metadata.api.IMetadataNode;
+import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.replication.management.ReplicationChannel;
@@ -85,6 +93,7 @@ import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
import org.apache.hyracks.storage.common.file.IResourceIdFactory;
public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAsterixPropertiesProvider {
+ private static final Logger LOGGER = Logger.getLogger(AsterixAppRuntimeContext.class.getName());
private static final AsterixPropertiesAccessor ASTERIX_PROPERTIES_ACCESSOR;
@@ -128,8 +137,9 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
private IReplicationManager replicationManager;
private IRemoteRecoveryManager remoteRecoveryManager;
private IReplicaResourcesManager replicaResourcesManager;
+ private final int metadataRmiPort;
- public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext) {
+ public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext, int metadataRmiPort) {
this.ncApplicationContext = ncApplicationContext;
compilerProperties = new AsterixCompilerProperties(ASTERIX_PROPERTIES_ACCESSOR);
externalProperties = new AsterixExternalProperties(ASTERIX_PROPERTIES_ACCESSOR);
@@ -140,6 +150,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
buildProperties = new AsterixBuildProperties(ASTERIX_PROPERTIES_ACCESSOR);
replicationProperties = new AsterixReplicationProperties(ASTERIX_PROPERTIES_ACCESSOR,
AsterixClusterProperties.INSTANCE.getCluster());
+ this.metadataRmiPort = metadataRmiPort;
}
public void initialize(boolean initialRun) throws IOException, ACIDException, AsterixException {
@@ -159,7 +170,8 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
metadataMergePolicyFactory = new PrefixMergePolicyFactory();
ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
- ioManager, ncApplicationContext.getNodeId());
+ ioManager, ncApplicationContext.getNodeId(), metadataProperties);
+
localResourceRepository = persistentLocalResourceRepositoryFactory.createRepository();
IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProdiverForRecovery(
@@ -186,9 +198,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
if (replicationProperties.isReplicationEnabled()) {
String nodeId = ncApplicationContext.getNodeId();
- replicaResourcesManager = new ReplicaResourcesManager(ioManager.getIODevices(),
- AsterixClusterProperties.INSTANCE.getStorageDirectoryName(), nodeId,
- replicationProperties.getReplicationStore());
+ replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties);
replicationManager = new ReplicationManager(nodeId, replicationProperties, replicaResourcesManager,
txnSubsystem.getLogManager(), asterixAppRuntimeContextProvider);
@@ -225,14 +235,14 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
* to process any logs that might be generated during stopping these components
*/
lccm.register((ILifeCycleComponent) txnSubsystem.getLogManager());
- lccm.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
/**
- * ReplicationManager must be stopped after indexLifecycleManager so that any logs/files generated
- * during closing datasets are sent to remote replicas
+ * ReplicationManager must be stopped after indexLifecycleManager and recovery manager
+ * so that any logs/files generated during closing datasets or checkpoints are sent to remote replicas
*/
if (replicationManager != null) {
lccm.register(replicationManager);
}
+ lccm.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
/**
* Stopping indexLifecycleManager will flush and close all datasets.
*/
@@ -380,4 +390,35 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
public void initializeResourceIdFactory() throws HyracksDataException {
resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory();
}
+
+ @Override
+ public void initializeMetadata(boolean newUniverse) throws Exception {
+ IAsterixStateProxy proxy = null;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Bootstrapping metadata");
+ }
+ MetadataNode.INSTANCE.initialize(this);
+
+ proxy = (IAsterixStateProxy) ncApplicationContext.getDistributedState();
+ if (proxy == null) {
+ throw new IllegalStateException("Metadata node cannot access distributed state");
+ }
+
+ // This is a special case, we just give the metadataNode directly.
+ // This way we can delay the registration of the metadataNode until
+ // it is completely initialized.
+ MetadataManager.INSTANCE = new MetadataManager(proxy, MetadataNode.INSTANCE);
+ MetadataBootstrap.startUniverse(this, ncApplicationContext, newUniverse);
+ MetadataBootstrap.startDDLRecovery();
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Metadata node bound");
+ }
+ }
+
+ @Override
+ public void exportMetadataNodeStub() throws RemoteException {
+ IMetadataNode stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, metadataRmiPort);
+ ((IAsterixStateProxy) ncApplicationContext.getDistributedState()).setMetadataNode(stub);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
deleted file mode 100644
index 4fae7e9..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.hyracks.bootstrap;
-
-import java.util.List;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IClusterEventsSubscriber;
-import org.apache.asterix.common.api.IClusterManagementWork;
-import org.apache.asterix.common.api.IClusterManagementWorkResponse;
-import org.apache.asterix.common.config.MetadataConstants;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.feed.CentralFeedManager;
-import org.apache.asterix.file.ExternalIndexingOperations;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
-import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class AsterixGlobalRecoveryManager implements IClusterEventsSubscriber {
-
- private static ClusterState state;
- private static final Logger LOGGER = Logger.getLogger(AsterixGlobalRecoveryManager.class.getName());
- private HyracksConnection hcc;
- public static AsterixGlobalRecoveryManager INSTANCE;
-
- public AsterixGlobalRecoveryManager(HyracksConnection hcc) throws Exception {
- state = AsterixClusterProperties.INSTANCE.getState();
- this.hcc = hcc;
- }
-
- @Override
- public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) {
- state = AsterixClusterProperties.INSTANCE.getState();
- AsterixClusterProperties.INSTANCE.setGlobalRecoveryCompleted(false);
- return null;
- }
-
- @Override
- public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
- // perform global recovery if state changed to active
- final ClusterState newState = AsterixClusterProperties.INSTANCE.getState();
- boolean needToRecover = !newState.equals(state) && (newState == ClusterState.ACTIVE);
- if (needToRecover) {
- Thread recoveryThread = new Thread(new Runnable() {
- @Override
- public void run() {
- LOGGER.info("Starting AsterixDB's Global Recovery");
- MetadataTransactionContext mdTxnCtx = null;
- try {
- Thread.sleep(4000);
- MetadataManager.INSTANCE.init();
- // Loop over datasets
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- List<Dataverse> dataverses = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
- for (Dataverse dataverse : dataverses) {
- if (!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)) {
- AqlMetadataProvider metadataProvider = new AqlMetadataProvider(dataverse, CentralFeedManager.getInstance());
- List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
- dataverse.getDataverseName());
- for (Dataset dataset : datasets) {
- if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
- // External dataset
- // Get indexes
- List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
- dataset.getDataverseName(), dataset.getDatasetName());
- if (indexes.size() > 0) {
- // Get the state of the dataset
- ExternalDatasetDetails dsd = (ExternalDatasetDetails) dataset
- .getDatasetDetails();
- ExternalDatasetTransactionState datasetState = dsd.getState();
- if (datasetState == ExternalDatasetTransactionState.BEGIN) {
- List<ExternalFile> files = MetadataManager.INSTANCE
- .getDatasetExternalFiles(mdTxnCtx, dataset);
- // if persumed abort, roll backward
- // 1. delete all pending files
- for (ExternalFile file : files) {
- if (file.getPendingOp() != ExternalFilePendingOp.PENDING_NO_OP) {
- MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
- }
- }
- // 2. clean artifacts in NCs
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- JobSpecification jobSpec = ExternalIndexingOperations.buildAbortOp(
- dataset, indexes, metadataProvider);
- executeHyracksJob(jobSpec);
- // 3. correct the dataset state
- ((ExternalDatasetDetails) dataset.getDatasetDetails())
- .setState(ExternalDatasetTransactionState.COMMIT);
- MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- } else if (datasetState == ExternalDatasetTransactionState.READY_TO_COMMIT) {
- List<ExternalFile> files = MetadataManager.INSTANCE
- .getDatasetExternalFiles(mdTxnCtx, dataset);
- // if ready to commit, roll forward
- // 1. commit indexes in NCs
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- JobSpecification jobSpec = ExternalIndexingOperations.buildRecoverOp(
- dataset, indexes, metadataProvider);
- executeHyracksJob(jobSpec);
- // 2. add pending files in metadata
- for (ExternalFile file : files) {
- if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP) {
- MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
- file.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
- MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
- } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) {
- // find original file
- for (ExternalFile originalFile : files) {
- if (originalFile.getFileName().equals(file.getFileName())) {
- MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
- file);
- MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
- originalFile);
- break;
- }
- }
- } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_APPEND_OP) {
- // find original file
- for (ExternalFile originalFile : files) {
- if (originalFile.getFileName().equals(file.getFileName())) {
- MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
- file);
- MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
- originalFile);
- originalFile.setSize(file.getSize());
- MetadataManager.INSTANCE.addExternalFile(mdTxnCtx,
- originalFile);
- }
- }
- }
- }
- // 3. correct the dataset state
- ((ExternalDatasetDetails) dataset.getDatasetDetails())
- .setState(ExternalDatasetTransactionState.COMMIT);
- MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- }
- }
- }
- }
- }
- }
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e) {
- // This needs to be fixed <-- Needs to shutdown the system -->
- /*
- * Note: Throwing this illegal state exception will terminate this thread
- * and feeds listeners will not be notified.
- */
- LOGGER.severe("Global recovery was not completed successfully" + e);
- try {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
- } catch (Exception e1) {
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe("Exception in aborting" + e.getMessage());
- }
- throw new IllegalStateException(e1);
- }
- }
- AsterixClusterProperties.INSTANCE.setGlobalRecoveryCompleted(true);
- LOGGER.info("Global Recovery Completed");
- }
- });
- state = newState;
- recoveryThread.start();
- }
- return null;
- }
-
- private void executeHyracksJob(JobSpecification spec) throws Exception {
- spec.setMaxReattempts(0);
- JobId jobId = hcc.startJob(spec);
- hcc.waitForCompletion(jobId);
- }
-
- @Override
- public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
- // Do nothing
- }
-
- @Override
- public void notifyStateChange(ClusterState previousState, ClusterState newState) {
- // Do nothing?
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 2a7b3e4..f2fc9bf 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -79,7 +79,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
@Override
public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
- messageBroker = new CCMessageBroker((ClusterControllerService)ccAppCtx.getControllerService());
+ messageBroker = new CCMessageBroker((ClusterControllerService) ccAppCtx.getControllerService());
this.appCtx = ccAppCtx;
if (LOGGER.isLoggable(Level.INFO)) {
@@ -87,7 +87,11 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
}
appCtx.setThreadFactory(new AsterixThreadFactory(new LifeCycleComponentManager()));
- AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection());
+ GlobalRecoveryManager.INSTANCE = new GlobalRecoveryManager(
+ (HyracksConnection) getNewHyracksClientConnection());
+
+ AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection(),
+ GlobalRecoveryManager.INSTANCE);
proxy = AsterixStateProxy.registerRemoteObject();
appCtx.setDistributedState(proxy);
@@ -111,9 +115,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
centralFeedManager = CentralFeedManager.getInstance();
centralFeedManager.start();
- AsterixGlobalRecoveryManager.INSTANCE = new AsterixGlobalRecoveryManager(
- (HyracksConnection) getNewHyracksClientConnection());
- ClusterManager.INSTANCE.registerSubscriber(AsterixGlobalRecoveryManager.INSTANCE);
+ ClusterManager.INSTANCE.registerSubscriber(GlobalRecoveryManager.INSTANCE);
AsterixReplicationProperties asterixRepliactionProperties = AsterixAppContextInfo.getInstance()
.getReplicationProperties();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 9505692..00b7391 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -95,19 +95,12 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("NC: " + deadNode + " left");
}
- AsterixClusterProperties.INSTANCE.removeNCConfiguration(deadNode);
-
//if metadata node failed, we need to rebind the proxy connection when it joins again.
- //Note: the format for the NC should be (INSTANCE-NAME)_(NC-ID)
- if (AsterixClusterProperties.INSTANCE.getCluster() != null) {
- String instanceName = AsterixClusterProperties.INSTANCE.getCluster().getInstanceName();
- String metadataNodeName = AsterixClusterProperties.INSTANCE.getCluster().getMetadataNode();
- String completeMetadataNodeName = instanceName + "_" + metadataNodeName;
- if (deadNode.equals(completeMetadataNodeName)) {
- MetadataManager.INSTANCE.rebindMetadataNode = true;
- }
+ String metadataNode = AsterixClusterProperties.INSTANCE.getCurrentMetadataNode();
+ if (deadNode.equals(metadataNode)) {
+ MetadataManager.INSTANCE.rebindMetadataNode = true;
}
-
+ AsterixClusterProperties.INSTANCE.removeNCConfiguration(deadNode);
}
updateProgress(ClusterEventType.NODE_FAILURE, deadNodeIds);
Set<IClusterEventsSubscriber> subscribers = ClusterManager.INSTANCE.getRegisteredClusterEventSubscribers();
@@ -166,7 +159,8 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
case REMOVE_NODE:
nodesToRemove.addAll(((RemoveNodeWork) w).getNodesToBeRemoved());
nodeRemovalRequests.add(w);
- RemoveNodeWorkResponse response = new RemoveNodeWorkResponse((RemoveNodeWork) w, Status.IN_PROGRESS);
+ RemoveNodeWorkResponse response = new RemoveNodeWorkResponse((RemoveNodeWork) w,
+ Status.IN_PROGRESS);
pendingWorkResponses.add(response);
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
new file mode 100644
index 0000000..2bac1cf
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.hyracks.bootstrap;
+
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.api.IClusterManagementWorkResponse;
+import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
+import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.common.config.MetadataConstants;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.feed.CentralFeedManager;
+import org.apache.asterix.file.ExternalIndexingOperations;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class GlobalRecoveryManager implements IGlobalRecoveryMaanger {
+
+ private static ClusterState state;
+ private static final Logger LOGGER = Logger.getLogger(GlobalRecoveryManager.class.getName());
+ private HyracksConnection hcc;
+ public static GlobalRecoveryManager INSTANCE;
+
+ public GlobalRecoveryManager(HyracksConnection hcc) throws Exception {
+ state = ClusterState.UNUSABLE;
+ this.hcc = hcc;
+ }
+
+ @Override
+ public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) {
+ state = AsterixClusterProperties.INSTANCE.getState();
+ AsterixClusterProperties.INSTANCE.setGlobalRecoveryCompleted(false);
+ return null;
+ }
+
+ @Override
+ public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
+ startGlobalRecovery();
+ return null;
+ }
+
+ private void executeHyracksJob(JobSpecification spec) throws Exception {
+ spec.setMaxReattempts(0);
+ JobId jobId = hcc.startJob(spec);
+ hcc.waitForCompletion(jobId);
+ }
+
+ @Override
+ public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
+ // Do nothing
+ }
+
+ @Override
+ public void notifyStateChange(ClusterState previousState, ClusterState newState) {
+ // Do nothing?
+ }
+
+ @Override
+ public void startGlobalRecovery() {
+ // perform global recovery if state changed to active
+ final ClusterState newState = AsterixClusterProperties.INSTANCE.getState();
+ boolean needToRecover = !newState.equals(state) && (newState == ClusterState.ACTIVE);
+ if (needToRecover) {
+ Thread recoveryThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ LOGGER.info("Starting AsterixDB's Global Recovery");
+ MetadataTransactionContext mdTxnCtx = null;
+ try {
+ Thread.sleep(4000);
+ MetadataManager.INSTANCE.init();
+ // Loop over datasets
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ List<Dataverse> dataverses = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
+ for (Dataverse dataverse : dataverses) {
+ if (!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)) {
+ AqlMetadataProvider metadataProvider = new AqlMetadataProvider(dataverse,
+ CentralFeedManager.getInstance());
+ List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
+ dataverse.getDataverseName());
+ for (Dataset dataset : datasets) {
+ if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+ // External dataset
+ // Get indexes
+ List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
+ dataset.getDataverseName(), dataset.getDatasetName());
+ if (indexes.size() > 0) {
+ // Get the state of the dataset
+ ExternalDatasetDetails dsd = (ExternalDatasetDetails) dataset
+ .getDatasetDetails();
+ ExternalDatasetTransactionState datasetState = dsd.getState();
+ if (datasetState == ExternalDatasetTransactionState.BEGIN) {
+ List<ExternalFile> files = MetadataManager.INSTANCE
+ .getDatasetExternalFiles(mdTxnCtx, dataset);
+ // if persumed abort, roll backward
+ // 1. delete all pending files
+ for (ExternalFile file : files) {
+ if (file.getPendingOp() != ExternalFilePendingOp.PENDING_NO_OP) {
+ MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
+ }
+ }
+ // 2. clean artifacts in NCs
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ JobSpecification jobSpec = ExternalIndexingOperations
+ .buildAbortOp(dataset, indexes, metadataProvider);
+ executeHyracksJob(jobSpec);
+ // 3. correct the dataset state
+ ((ExternalDatasetDetails) dataset.getDatasetDetails())
+ .setState(ExternalDatasetTransactionState.COMMIT);
+ MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ } else if (datasetState == ExternalDatasetTransactionState.READY_TO_COMMIT) {
+ List<ExternalFile> files = MetadataManager.INSTANCE
+ .getDatasetExternalFiles(mdTxnCtx, dataset);
+ // if ready to commit, roll forward
+ // 1. commit indexes in NCs
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ JobSpecification jobSpec = ExternalIndexingOperations
+ .buildRecoverOp(dataset, indexes, metadataProvider);
+ executeHyracksJob(jobSpec);
+ // 2. add pending files in metadata
+ for (ExternalFile file : files) {
+ if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP) {
+ MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
+ file.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
+ MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
+ } else if (file
+ .getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) {
+ // find original file
+ for (ExternalFile originalFile : files) {
+ if (originalFile.getFileName().equals(file.getFileName())) {
+ MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+ file);
+ MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+ originalFile);
+ break;
+ }
+ }
+ } else if (file
+ .getPendingOp() == ExternalFilePendingOp.PENDING_APPEND_OP) {
+ // find original file
+ for (ExternalFile originalFile : files) {
+ if (originalFile.getFileName().equals(file.getFileName())) {
+ MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+ file);
+ MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+ originalFile);
+ originalFile.setSize(file.getSize());
+ MetadataManager.INSTANCE.addExternalFile(mdTxnCtx,
+ originalFile);
+ }
+ }
+ }
+ }
+ // 3. correct the dataset state
+ ((ExternalDatasetDetails) dataset.getDatasetDetails())
+ .setState(ExternalDatasetTransactionState.COMMIT);
+ MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ }
+ }
+ }
+ }
+ }
+ }
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ // This needs to be fixed <-- Needs to shutdown the system -->
+ /*
+ * Note: Throwing this illegal state exception will terminate this thread
+ * and feeds listeners will not be notified.
+ */
+ LOGGER.severe("Global recovery was not completed successfully" + e);
+ try {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ } catch (Exception e1) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Exception in aborting" + e.getMessage());
+ }
+ throw new IllegalStateException(e1);
+ }
+ }
+ AsterixClusterProperties.INSTANCE.setGlobalRecoveryCompleted(true);
+ LOGGER.info("Global Recovery Completed");
+ }
+ });
+ state = newState;
+ recoveryThread.start();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 3341387..dac9af5 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -20,7 +20,6 @@ package org.apache.asterix.hyracks.bootstrap;
import java.io.File;
import java.io.IOException;
-import java.rmi.server.UnicastRemoteObject;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -42,10 +41,6 @@ import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.asterix.messaging.NCMessageBroker;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataNode;
-import org.apache.asterix.metadata.api.IAsterixStateProxy;
-import org.apache.asterix.metadata.api.IMetadataNode;
import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
@@ -102,7 +97,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
LOGGER.info("Starting Asterix node controller: " + nodeId);
}
- runtimeContext = new AsterixAppRuntimeContext(ncApplicationContext);
+ runtimeContext = new AsterixAppRuntimeContext(ncApplicationContext, metadataRmiPort);
AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
.getMetadataProperties();
if (!metadataProperties.getNodeNames().contains(ncApplicationContext.getNodeId())) {
@@ -215,33 +210,12 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
localResourceRepository.initializeNewUniverse(AsterixClusterProperties.INSTANCE.getStorageDirectoryName());
}
- IAsterixStateProxy proxy = null;
isMetadataNode = nodeId.equals(metadataProperties.getMetadataNodeName());
if (isMetadataNode) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Bootstrapping metadata");
- }
- MetadataNode.INSTANCE.initialize(runtimeContext);
-
- proxy = (IAsterixStateProxy) ncApplicationContext.getDistributedState();
- if (proxy == null) {
- throw new IllegalStateException("Metadata node cannot access distributed state");
- }
-
- //This is a special case, we just give the metadataNode directly.
- //This way we can delay the registration of the metadataNode until
- //it is completely initialized.
- MetadataManager.INSTANCE = new MetadataManager(proxy, MetadataNode.INSTANCE);
- MetadataBootstrap.startUniverse(((IAsterixPropertiesProvider) runtimeContext), ncApplicationContext,
- systemState == SystemState.NEW_UNIVERSE);
- MetadataBootstrap.startDDLRecovery();
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Metadata node bound");
- }
+ runtimeContext.initializeMetadata(systemState == SystemState.NEW_UNIVERSE);
}
-
ExternalLibraryBootstrap.setUpExternaLibraries(isMetadataNode);
+
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting lifecycle components");
}
@@ -267,9 +241,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
recoveryMgr.checkpoint(true, RecoveryManager.NON_SHARP_CHECKPOINT_TARGET_LSN);
if (isMetadataNode) {
- IMetadataNode stub = null;
- stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, metadataRmiPort);
- proxy.setMetadataNode(stub);
+ runtimeContext.exportMetadataNodeStub();
}
//Clean any temporary files
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 095ef1b..aeaef59 100644
--- a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -29,14 +29,17 @@ import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
import org.apache.asterix.common.messaging.ReportMaxResourceIdRequestMessage;
import org.apache.asterix.common.messaging.ResourceIdRequestMessage;
import org.apache.asterix.common.messaging.ResourceIdRequestResponseMessage;
+import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.hyracks.api.messages.IMessage;
-import org.apache.hyracks.api.messages.IMessageBroker;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
-public class CCMessageBroker implements IMessageBroker {
+public class CCMessageBroker implements ICCMessageBroker {
private final static Logger LOGGER = Logger.getLogger(CCMessageBroker.class.getName());
private final AtomicLong globalResourceId = new AtomicLong(0);
@@ -58,6 +61,12 @@ public class CCMessageBroker implements IMessageBroker {
case REPORT_MAX_RESOURCE_ID_RESPONSE:
handleReportResourceMaxIdResponse(message, nodeId);
break;
+ case TAKEOVER_PARTITIONS_RESPONSE:
+ handleTakeoverPartitionsResponse(message);
+ break;
+ case TAKEOVER_METADATA_NODE_RESPONSE:
+ handleTakeoverMetadataNodeResponse(message);
+ break;
default:
LOGGER.warning("Unknown message: " + absMessage.getMessageType());
break;
@@ -89,7 +98,8 @@ public class CCMessageBroker implements IMessageBroker {
nodesReportedMaxResourceId.add(nodeId);
}
- private void sendApplicationMessageToNC(IMessage msg, String nodeId) throws Exception {
+ @Override
+ public void sendApplicationMessageToNC(IApplicationMessage msg, String nodeId) throws Exception {
Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
NodeControllerState state = nodeMap.get(nodeId);
state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
@@ -106,4 +116,14 @@ public class CCMessageBroker implements IMessageBroker {
}
}
}
+
+ private void handleTakeoverPartitionsResponse(IMessage message) {
+ TakeoverPartitionsResponseMessage msg = (TakeoverPartitionsResponseMessage) message;
+ AsterixClusterProperties.INSTANCE.processPartitionTakeoverResponse(msg);
+ }
+
+ private void handleTakeoverMetadataNodeResponse(IMessage message) {
+ TakeoverMetadataNodeResponseMessage msg = (TakeoverMetadataNodeResponseMessage) message;
+ AsterixClusterProperties.INSTANCE.processMetadataNodeTakeoverResponse(msg);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 001771e..8f8723e 100644
--- a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -25,9 +25,13 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.messaging.AbstractApplicationMessage;
import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
+import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsRequestMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.api.util.JavaSerializationUtils;
@@ -75,11 +79,40 @@ public class NCMessageBroker implements INCMessageBroker {
case REPORT_MAX_RESOURCE_ID_REQUEST:
reportMaxResourceId();
break;
+ case TAKEOVER_PARTITIONS_REQUEST:
+ handleTakeoverPartitons(message);
+ break;
+ case TAKEOVER_METADATA_NODE_REQUEST:
+ handleTakeoverMetadataNode(message);
+ break;
default:
break;
}
}
+ private void handleTakeoverPartitons(IMessage message) throws Exception {
+ TakeoverPartitionsRequestMessage msg = (TakeoverPartitionsRequestMessage) message;
+ IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext()
+ .getApplicationObject();
+ IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
+ remoteRecoeryManager.takeoverPartitons(msg.getFailedNode(), msg.getPartitions());
+ //send response after takeover is completed
+ TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(),
+ appContext.getTransactionSubsystem().getId(), msg.getPartitions());
+ sendMessage(reponse, null);
+ }
+
+ private void handleTakeoverMetadataNode(IMessage message) throws Exception {
+ IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext()
+ .getApplicationObject();
+ appContext.initializeMetadata(false);
+ appContext.exportMetadataNodeStub();
+ //send response after takeover is completed
+ TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
+ appContext.getTransactionSubsystem().getId());
+ sendMessage(reponse, null);
+ }
+
@Override
public void reportMaxResourceId() throws Exception {
IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext()
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index 3386252..975180b 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.api;
import java.io.IOException;
+import java.rmi.RemoteException;
import java.util.List;
import java.util.concurrent.Executor;
@@ -89,4 +90,17 @@ public interface IAsterixAppRuntimeContext {
public IReplicationChannel getReplicationChannel();
public void initializeResourceIdFactory() throws HyracksDataException;
+
+ /**
+ * Exports the metadata node to the metadata RMI port.
+ * @throws RemoteException
+ */
+ public void exportMetadataNodeStub() throws RemoteException;
+
+ /**
+ * Initializes the metadata node and bootstraps the metadata.
+ * @param newUniverse
+ * @throws Exception
+ */
+ public void initializeMetadata(boolean newUniverse) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java b/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java
new file mode 100644
index 0000000..48b1e73
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import org.apache.asterix.common.api.IClusterEventsSubscriber;
+
+public interface IGlobalRecoveryMaanger extends IClusterEventsSubscriber {
+
+ /**
+ * Starts the global recovery process if the cluster state changed to ACTIVE.
+ */
+ public void startGlobalRecovery();
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
index 8e2c4e7..473a163 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
@@ -65,4 +65,8 @@ public class AsterixMetadataProperties extends AbstractAsterixProperties {
public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
return accessor.getClusterPartitions();
}
+
+ public Map<String, String> getTransactionLogDirs() {
+ return accessor.getTransactionLogDirs();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
index 1ef7e3e..fa5b503 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
@@ -36,7 +36,6 @@ public class AsterixReplicationProperties extends AbstractAsterixProperties {
private static int REPLICATION_TIME_OUT_DEFAULT = 15;
private static final String NODE_IP_ADDRESS_DEFAULT = "127.0.0.1";
- private static final String REPLICATION_STORE_DEFAULT = "asterix-replication";
private final String NODE_NAME_PREFIX;
private final Cluster cluster;
@@ -102,8 +101,8 @@ public class AsterixReplicationProperties extends AbstractAsterixProperties {
}
if (nodeIndex == -1) {
- LOGGER.log(Level.WARNING, "Could not find node " + getRealCluserNodeID(nodeId)
- + " in cluster configurations");
+ LOGGER.log(Level.WARNING,
+ "Could not find node " + getRealCluserNodeID(nodeId) + " in cluster configurations");
return null;
}
@@ -179,13 +178,6 @@ public class AsterixReplicationProperties extends AbstractAsterixProperties {
return replicaIds;
}
- public String getReplicationStore() {
- if (cluster != null) {
- return cluster.getDataReplication().getReplicationStore();
- }
- return REPLICATION_STORE_DEFAULT;
- }
-
public int getReplicationFactor() {
if (cluster != null) {
if (cluster.getDataReplication() == null || cluster.getDataReplication().getReplicationFactor() == null) {
@@ -202,5 +194,4 @@ public class AsterixReplicationProperties extends AbstractAsterixProperties {
}
return REPLICATION_TIME_OUT_DEFAULT;
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
index 8dc3efe..a5cd72b 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.common.dataflow;
+import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
import org.apache.hyracks.api.application.ICCApplicationContext;
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import org.apache.hyracks.storage.common.IStorageManagerInterface;
@@ -48,4 +49,6 @@ public interface IAsterixApplicationContextInfo {
* @return ICCApplicationContext implementation instance
*/
public ICCApplicationContext getCCApplicationContext();
+
+ public IGlobalRecoveryMaanger getGlobalRecoveryManager();
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
new file mode 100644
index 0000000..78b86a8
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverMetadataNodeRequestMessage extends AbstractApplicationMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.TAKEOVER_METADATA_NODE_REQUEST;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
new file mode 100644
index 0000000..78f7429
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverMetadataNodeResponseMessage extends AbstractApplicationMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final String nodeId;
+
+ public TakeoverMetadataNodeResponseMessage(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.TAKEOVER_METADATA_NODE_RESPONSE;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
new file mode 100644
index 0000000..abfa7d2
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final Integer[] partitions;
+ private final String failedNode;
+ private final long requestId;
+ private final String nodeId;
+
+ public TakeoverPartitionsRequestMessage(long requestId, String nodeId, String failedNode,
+ Integer[] partitionsToTakeover) {
+ this.requestId = requestId;
+ this.nodeId = nodeId;
+ this.failedNode = failedNode;
+ this.partitions = partitionsToTakeover;
+ }
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.TAKEOVER_PARTITIONS_REQUEST;
+ }
+
+ public Integer[] getPartitions() {
+ return partitions;
+ }
+
+ public long getRequestId() {
+ return requestId;
+ }
+
+ public String getFailedNode() {
+ return failedNode;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Request ID: " + requestId);
+ sb.append(" Node ID: " + nodeId);
+ sb.append(" Failed Node: " + failedNode);
+ sb.append(" Partitions: ");
+ for (Integer partitionId : partitions) {
+ sb.append(partitionId + ",");
+ }
+ //remove last comma
+ sb.charAt(sb.length() - 1);
+ return sb.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
new file mode 100644
index 0000000..86eb3cb
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverPartitionsResponseMessage extends AbstractApplicationMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final Integer[] partitions;
+ private final String nodeId;
+ private final long requestId;
+
+ public TakeoverPartitionsResponseMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
+ this.requestId = requestId;
+ this.nodeId = nodeId;
+ this.partitions = partitionsToTakeover;
+ }
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.TAKEOVER_PARTITIONS_RESPONSE;
+ }
+
+ public Integer[] getPartitions() {
+ return partitions;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public long getRequestId() {
+ return requestId;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index 61ab7cd..57a0dae 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -26,7 +26,11 @@ public interface IApplicationMessage extends IMessage {
RESOURCE_ID_REQUEST,
RESOURCE_ID_RESPONSE,
REPORT_MAX_RESOURCE_ID_REQUEST,
- REPORT_MAX_RESOURCE_ID_RESPONSE
+ REPORT_MAX_RESOURCE_ID_RESPONSE,
+ TAKEOVER_PARTITIONS_REQUEST,
+ TAKEOVER_PARTITIONS_RESPONSE,
+ TAKEOVER_METADATA_NODE_REQUEST,
+ TAKEOVER_METADATA_NODE_RESPONSE
}
public abstract ApplicationMessageType getMessageType();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
new file mode 100644
index 0000000..7dafbd5
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging.api;
+
+import org.apache.hyracks.api.messages.IMessageBroker;
+
+public interface ICCMessageBroker extends IMessageBroker {
+
+ /**
+ * Sends the passed message to the specified {@code nodeId}
+ * @param msg
+ * @param nodeId
+ * @throws Exception
+ */
+ public void sendApplicationMessageToNC(IApplicationMessage msg, String nodeId) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
index 63d29a0..ecc9494 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -18,8 +18,24 @@
*/
package org.apache.asterix.common.replication;
+import java.io.IOException;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+
public interface IRemoteRecoveryManager {
+ /**
+ * Attempts to perform the remote recovery process from an active remote replica.
+ */
public void performRemoteRecovery();
+ /**
+ * Performs the partitions takeover process from the {@code failedNode}
+ * @param failedNode
+ * @param partitions
+ * @throws IOException
+ * @throws ACIDException
+ */
+ public void takeoverPartitons(String failedNode, Integer[] partitions) throws IOException, ACIDException;
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
index c796f37..f13d300 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
@@ -22,10 +22,15 @@ import java.util.Set;
public interface IReplicaResourcesManager {
- public String getIndexPath(String backupNodeId, int IODeviceNum, String dataverse, String dataset);
-
- public String getLocalStorageFolder();
-
+ /**
+ * @param remoteNodes
+ * @return The minimum LSN of all indexes that belong to {@code remoteNodes}.
+ */
public long getMinRemoteLSN(Set<String> remoteNodes);
+ /**
+ * @param partitions
+ * @return the minimum LSN of all indexes that belong to {@code partitions}.
+ */
+ public long getPartitionsMinLSN(Integer[] partitions);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index 9ea9957..a2b7a82 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -111,4 +111,14 @@ public interface IRecoveryManager {
* @throws HyracksDataException
*/
public long getLocalMinFirstLSN() throws HyracksDataException;
+
+ /**
+ * Replay the logs that belong to the passed {@code partitions} starting from the {@code lowWaterMarkLSN}
+ * @param partitions
+ * @param lowWaterMarkLSN
+ * @param failedNode
+ * @throws IOException
+ * @throws ACIDException
+ */
+ public void replayPartitionsLogs(Integer[] partitions, long lowWaterMarkLSN, String failedNode) throws IOException, ACIDException;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index acfb9d5..48e42bd 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -56,6 +56,14 @@ public class StoragePathUtil {
}
public static String prepareDataverseIndexName(String dataverseName, String datasetName, String idxName) {
- return dataverseName + File.separator + datasetName + StoragePathUtil.DATASET_INDEX_NAME_SEPARATOR + idxName;
+ return prepareDataverseIndexName(dataverseName, prepareFullIndexName(datasetName, idxName));
+ }
+
+ public static String prepareDataverseIndexName(String dataverseName, String fullIndexName) {
+ return dataverseName + File.separator + fullIndexName;
+ }
+
+ private static String prepareFullIndexName(String datasetName, String idxName) {
+ return (datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/resources/schema/cluster.xsd
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index 872c959..e0605f0 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -47,7 +47,7 @@
<xs:element name="enabled" type="xs:boolean" />
<xs:element name="replication_port" type="xs:integer" />
<xs:element name="replication_factor" type="xs:integer" />
- <xs:element name="replication_store" type="xs:string" />
+ <xs:element name="auto_failover" type="xs:boolean" />
<xs:element name="replication_time_out" type="xs:integer" />
<!-- definition of complex elements -->
@@ -82,7 +82,7 @@
<xs:element ref="cl:enabled" />
<xs:element ref="cl:replication_port" />
<xs:element ref="cl:replication_factor" />
- <xs:element ref="cl:replication_store" />
+ <xs:element ref="cl:auto_failover" />
<xs:element ref="cl:replication_time_out" />
</xs:sequence>
</xs:complexType>