You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2018/01/05 16:10:37 UTC
[09/10] asterixdb git commit: [ASTERIXDB-2195][REPL] Replace Static
Replication
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
index 3c5442e..49f4b32 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
@@ -32,6 +32,9 @@ import org.apache.asterix.app.nc.task.LocalRecoveryTask;
import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
import org.apache.asterix.app.nc.task.ReportLocalCountersTask;
import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
+import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
+import org.apache.asterix.app.replication.message.MetadataNodeRequestMessage;
+import org.apache.asterix.app.replication.message.MetadataNodeResponseMessage;
import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
@@ -43,8 +46,8 @@ import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.replication.IFaultToleranceStrategy;
import org.apache.asterix.common.replication.INCLifecycleMessage;
-import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.metadata.MetadataManager;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -59,6 +62,7 @@ public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
private String metadataNodeId;
private Set<String> pendingStartupCompletionNodes = new HashSet<>();
private ICCMessageBroker messageBroker;
+ private boolean replicationEnabled;
@Override
public void notifyNodeJoin(String nodeId) throws HyracksDataException {
@@ -84,15 +88,19 @@ public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
case REGISTRATION_TASKS_RESULT:
process((NCLifecycleTaskReportMessage) message);
break;
+ case METADATA_NODE_RESPONSE:
+ process((MetadataNodeResponseMessage) message);
+ break;
default:
throw new RuntimeDataException(ErrorCode.UNSUPPORTED_MESSAGE_TYPE, message.getType().name());
}
}
@Override
- public IFaultToleranceStrategy from(ICCServiceContext serviceCtx, IReplicationStrategy replicationStrategy) {
+ public IFaultToleranceStrategy from(ICCServiceContext serviceCtx, boolean replicationEnabled) {
NoFaultToleranceStrategy ft = new NoFaultToleranceStrategy();
ft.messageBroker = (ICCMessageBroker) serviceCtx.getMessageBroker();
+ ft.replicationEnabled = replicationEnabled;
return ft;
}
@@ -141,10 +149,14 @@ public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
final List<INCLifecycleTask> tasks = new ArrayList<>();
if (state == SystemState.CORRUPTED) {
//need to perform local recovery for node partitions
- LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.asList(clusterManager.getNodePartitions(nodeId))
- .stream().map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
+ LocalRecoveryTask rt = new LocalRecoveryTask(
+ Arrays.asList(clusterManager.getNodePartitions(nodeId)).stream()
+ .map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
tasks.add(rt);
}
+ if (replicationEnabled) {
+ tasks.add(new StartReplicationServiceTask());
+ }
if (isMetadataNode) {
tasks.add(new MetadataBootstrapTask());
}
@@ -168,4 +180,42 @@ public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
tasks.add(new ReportLocalCountersTask());
return tasks;
}
+
+ @Override
+ public void notifyMetadataNodeChange(String node) throws HyracksDataException {
+ if (metadataNodeId.equals(node)) {
+ return;
+ }
+ // if current metadata node is active, we need to unbind its metadata proxy object
+ if (clusterManager.isMetadataNodeActive()) {
+ MetadataNodeRequestMessage msg = new MetadataNodeRequestMessage(false);
+ try {
+ messageBroker.sendApplicationMessageToNC(msg, metadataNodeId);
+ // when the current node responses, we will bind to the new one
+ metadataNodeId = node;
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ } else {
+ requestMetadataNodeTakeover(node);
+ }
+ }
+
+ private void process(MetadataNodeResponseMessage response) throws HyracksDataException {
+ // rebind metadata node since it might be changing
+ MetadataManager.INSTANCE.rebindMetadataNode();
+ clusterManager.updateMetadataNode(response.getNodeId(), response.isExported());
+ if (!response.isExported()) {
+ requestMetadataNodeTakeover(metadataNodeId);
+ }
+ }
+
+ private void requestMetadataNodeTakeover(String node) throws HyracksDataException {
+ MetadataNodeRequestMessage msg = new MetadataNodeRequestMessage(true);
+ try {
+ messageBroker.sendApplicationMessageToNC(msg, node);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java
deleted file mode 100644
index ab938d2..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.asterix.app.replication.message.CompleteFailbackRequestMessage;
-import org.apache.asterix.app.replication.message.PreparePartitionsFailbackRequestMessage;
-import org.apache.asterix.app.replication.message.PreparePartitionsFailbackResponseMessage;
-
-public class NodeFailbackPlan {
-
- public enum FailbackPlanState {
- /**
- * Initial state while selecting the nodes that will participate
- * in the node failback plan.
- */
- PREPARING,
- /**
- * Once a pending {@link PreparePartitionsFailbackRequestMessage} request is added,
- * the state is changed from PREPARING to PENDING_PARTICIPANT_REPONSE to indicate
- * a response is expected and need to wait for it.
- */
- PENDING_PARTICIPANT_REPONSE,
- /**
- * Upon receiving the last {@link PreparePartitionsFailbackResponseMessage} response,
- * the state changes from PENDING_PARTICIPANT_REPONSE to PENDING_COMPLETION to indicate
- * the need to send {@link CompleteFailbackRequestMessage} to the failing back node.
- */
- PENDING_COMPLETION,
- /**
- * if any of the participants fail or the failing back node itself fails during
- * and of these states (PREPARING, PENDING_PARTICIPANT_REPONSE, PENDING_COMPLETION),
- * the state is changed to FAILED.
- */
- FAILED,
- /**
- * if the state is FAILED, and all pending responses (if any) have been received,
- * the state changes from FAILED to PENDING_ROLLBACK to indicate the need to revert
- * the effects of this plan (if any).
- */
- PENDING_ROLLBACK
- }
-
- private static long planIdGenerator = 0;
- private long planId;
- private final String nodeId;
- private final Set<String> participants;
- private final Map<Integer, String> partition2nodeMap;
- private String nodeToReleaseMetadataManager;
- private int requestId;
- private Map<Integer, PreparePartitionsFailbackRequestMessage> pendingRequests;
- private FailbackPlanState state;
-
- public static NodeFailbackPlan createPlan(String nodeId) {
- return new NodeFailbackPlan(planIdGenerator++, nodeId);
- }
-
- private NodeFailbackPlan(long planId, String nodeId) {
- this.planId = planId;
- this.nodeId = nodeId;
- participants = new HashSet<>();
- partition2nodeMap = new HashMap<>();
- pendingRequests = new HashMap<>();
- state = FailbackPlanState.PREPARING;
- }
-
- public synchronized void addPartitionToFailback(int partitionId, String currentActiveNode) {
- partition2nodeMap.put(partitionId, currentActiveNode);
- }
-
- public synchronized void addParticipant(String nodeId) {
- participants.add(nodeId);
- }
-
- public synchronized void notifyNodeFailure(String failedNode) {
- if (participants.contains(failedNode)) {
- if (state == FailbackPlanState.PREPARING) {
- state = FailbackPlanState.FAILED;
- } else if (state == FailbackPlanState.PENDING_PARTICIPANT_REPONSE) {
- /**
- * if there is any pending request from this failed node,
- * it should be marked as completed and the plan should be marked as failed
- */
- Set<Integer> failedRequests = new HashSet<>();
- for (PreparePartitionsFailbackRequestMessage request : pendingRequests.values()) {
- if (request.getNodeID().equals(failedNode)) {
- failedRequests.add(request.getRequestId());
- }
- }
-
- if (!failedRequests.isEmpty()) {
- state = FailbackPlanState.FAILED;
- for (Integer failedRequestId : failedRequests) {
- markRequestCompleted(failedRequestId);
- }
- }
- }
- } else if (nodeId.equals(failedNode)) {
- //if the failing back node is the failed node itself
- state = FailbackPlanState.FAILED;
- updateState();
- }
- }
-
- public synchronized Set<Integer> getPartitionsToFailback() {
- return new HashSet<>(partition2nodeMap.keySet());
- }
-
- public synchronized void addPendingRequest(PreparePartitionsFailbackRequestMessage msg) {
- //if this is the first request
- if (pendingRequests.size() == 0) {
- state = FailbackPlanState.PENDING_PARTICIPANT_REPONSE;
- }
- pendingRequests.put(msg.getRequestId(), msg);
- }
-
- public synchronized void markRequestCompleted(int requestId) {
- pendingRequests.remove(requestId);
- updateState();
- }
-
- private void updateState() {
- if (pendingRequests.size() == 0) {
- switch (state) {
- case PREPARING:
- case FAILED:
- state = FailbackPlanState.PENDING_ROLLBACK;
- break;
- case PENDING_PARTICIPANT_REPONSE:
- state = FailbackPlanState.PENDING_COMPLETION;
- break;
- default:
- break;
- }
- }
- }
-
- public synchronized Set<PreparePartitionsFailbackRequestMessage> getPlanFailbackRequests() {
- Set<PreparePartitionsFailbackRequestMessage> node2Partitions = new HashSet<>();
- /**
- * for each participant, construct a request with the partitions
- * that will be failed back or flushed.
- */
- for (String participant : participants) {
- Set<Integer> partitionToPrepareForFailback = new HashSet<>();
- partition2nodeMap.forEach((key, value) -> {
- if (value.equals(participant)) {
- partitionToPrepareForFailback.add(key);
- }
- });
- PreparePartitionsFailbackRequestMessage msg = new PreparePartitionsFailbackRequestMessage(planId,
- requestId++, participant, partitionToPrepareForFailback);
- if (participant.equals(nodeToReleaseMetadataManager)) {
- msg.setReleaseMetadataNode(true);
- }
- node2Partitions.add(msg);
- }
- return node2Partitions;
- }
-
- public synchronized CompleteFailbackRequestMessage getCompleteFailbackRequestMessage() {
- return new CompleteFailbackRequestMessage(planId, requestId++, nodeId, getPartitionsToFailback());
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- public long getPlanId() {
- return planId;
- }
-
- public void setNodeToReleaseMetadataManager(String nodeToReleaseMetadataManager) {
- this.nodeToReleaseMetadataManager = nodeToReleaseMetadataManager;
- }
-
- public synchronized FailbackPlanState getState() {
- return state;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("Plan ID: " + planId);
- sb.append(" Failing back node: " + nodeId);
- sb.append(" Participants: " + participants);
- sb.append(" Partitions to Failback: " + partition2nodeMap.keySet());
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
deleted file mode 100644
index ad4afd0..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication.message;
-
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.messaging.api.INcAddressedMessage;
-import org.apache.asterix.common.replication.IRemoteRecoveryManager;
-import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage implements INcAddressedMessage {
-
- private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = LogManager.getLogger();
- private final Set<Integer> partitions;
- private final String nodeId;
-
- public CompleteFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) {
- super(planId, requestId);
- this.nodeId = nodeId;
- this.partitions = partitions;
- }
-
- public Set<Integer> getPartitions() {
- return partitions;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(CompleteFailbackRequestMessage.class.getSimpleName());
- sb.append(" Plan ID: " + planId);
- sb.append(" Node ID: " + nodeId);
- sb.append(" Partitions: " + partitions);
- return sb.toString();
- }
-
- @Override
- public void handle(INcApplicationContext appContext) throws HyracksDataException, InterruptedException {
- INCMessageBroker broker = (INCMessageBroker) appContext.getServiceContext().getMessageBroker();
- HyracksDataException hde = null;
- try {
- IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
- remoteRecoeryManager.completeFailbackProcess();
- } catch (IOException | InterruptedException e) {
- LOGGER.log(Level.ERROR, "Failure during completion of failback process", e);
- hde = HyracksDataException.create(e);
- } finally {
- CompleteFailbackResponseMessage reponse =
- new CompleteFailbackResponseMessage(planId, requestId, partitions);
- try {
- broker.sendMessageToCC(reponse);
- } catch (Exception e) {
- LOGGER.log(Level.ERROR, "Failure sending message to CC", e);
- hde = HyracksDataException.suppress(hde, e);
- }
- }
- if (hde != null) {
- throw hde;
- }
- }
-
- @Override
- public MessageType getType() {
- return MessageType.COMPLETE_FAILBACK_REQUEST;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java
deleted file mode 100644
index 0c5678f..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication.message;
-
-import java.util.Set;
-
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
-import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage;
-import org.apache.asterix.runtime.utils.CcApplicationContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class CompleteFailbackResponseMessage extends AbstractFailbackPlanMessage implements ICcAddressedMessage {
-
- private static final long serialVersionUID = 1L;
- private final Set<Integer> partitions;
-
- public CompleteFailbackResponseMessage(long planId, int requestId, Set<Integer> partitions) {
- super(planId, requestId);
- this.partitions = partitions;
- }
-
- public Set<Integer> getPartitions() {
- return partitions;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(CompleteFailbackResponseMessage.class.getSimpleName());
- sb.append(" Plan ID: " + planId);
- sb.append(" Partitions: " + partitions);
- return sb.toString();
- }
-
- @Override
- public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
- ((CcApplicationContext) appCtx).getFaultToleranceStrategy().process(this);
- }
-
- @Override
- public MessageType getType() {
- return MessageType.COMPLETE_FAILBACK_RESPONSE;
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
deleted file mode 100644
index 6b85050..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication.message;
-
-import java.rmi.RemoteException;
-import java.util.Set;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.messaging.api.INcAddressedMessage;
-import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class PreparePartitionsFailbackRequestMessage extends AbstractFailbackPlanMessage
- implements INcAddressedMessage {
-
- private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = LogManager.getLogger();
- private final Set<Integer> partitions;
- private boolean releaseMetadataNode = false;
- private final String nodeID;
-
- public PreparePartitionsFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) {
- super(planId, requestId);
- this.nodeID = nodeId;
- this.partitions = partitions;
- }
-
- public Set<Integer> getPartitions() {
- return partitions;
- }
-
- public boolean isReleaseMetadataNode() {
- return releaseMetadataNode;
- }
-
- public void setReleaseMetadataNode(boolean releaseMetadataNode) {
- this.releaseMetadataNode = releaseMetadataNode;
- }
-
- public String getNodeID() {
- return nodeID;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(PreparePartitionsFailbackRequestMessage.class.getSimpleName());
- sb.append(" Plan ID: " + planId);
- sb.append(" Partitions: " + partitions);
- sb.append(" releaseMetadataNode: " + releaseMetadataNode);
- return sb.toString();
- }
-
- @Override
- public void handle(INcApplicationContext appContext) throws HyracksDataException, InterruptedException {
- INCMessageBroker broker = (INCMessageBroker) appContext.getServiceContext().getMessageBroker();
- /**
- * if the metadata partition will be failed back
- * we need to flush and close all datasets including metadata datasets
- * otherwise we need to close all non-metadata datasets and flush metadata datasets
- * so that their memory components will be copied to the failing back node
- */
- if (releaseMetadataNode) {
- appContext.getDatasetLifecycleManager().closeAllDatasets();
- //remove the metadata node stub from RMI registry
- try {
- appContext.unexportMetadataNodeStub();
- } catch (RemoteException e) {
- LOGGER.log(Level.ERROR, "Failed unexporting metadata stub", e);
- throw HyracksDataException.create(e);
- }
- } else {
- //close all non-metadata datasets
- appContext.getDatasetLifecycleManager().closeUserDatasets();
- //flush the remaining metadata datasets that were not closed
- appContext.getDatasetLifecycleManager().flushAllDatasets();
- }
-
- //mark the partitions to be closed as inactive
- PersistentLocalResourceRepository localResourceRepo =
- (PersistentLocalResourceRepository) appContext.getLocalResourceRepository();
- for (Integer partitionId : partitions) {
- localResourceRepo.addInactivePartition(partitionId);
- }
-
- //send response after partitions prepared for failback
- PreparePartitionsFailbackResponseMessage reponse =
- new PreparePartitionsFailbackResponseMessage(planId, requestId, partitions);
- try {
- broker.sendMessageToCC(reponse);
- } catch (Exception e) {
- LOGGER.log(Level.ERROR, "Failed sending message to cc", e);
- throw HyracksDataException.create(e);
- }
- }
-
- @Override
- public MessageType getType() {
- return MessageType.PREPARE_FAILBACK_REQUEST;
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java
deleted file mode 100644
index bea1039..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication.message;
-
-import java.util.Set;
-
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
-import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage;
-import org.apache.asterix.runtime.utils.CcApplicationContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class PreparePartitionsFailbackResponseMessage extends AbstractFailbackPlanMessage
- implements ICcAddressedMessage {
-
- private static final long serialVersionUID = 1L;
- private final Set<Integer> partitions;
-
- public PreparePartitionsFailbackResponseMessage(long planId, int requestId, Set<Integer> partitions) {
- super(planId, requestId);
- this.partitions = partitions;
- }
-
- public Set<Integer> getPartitions() {
- return partitions;
- }
-
- @Override
- public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
- ((CcApplicationContext) appCtx).getFaultToleranceStrategy().process(this);
- }
-
- @Override
- public String toString() {
- return PreparePartitionsFailbackResponseMessage.class.getSimpleName() + " " + partitions.toString();
- }
-
- @Override
- public MessageType getType() {
- return MessageType.PREPARE_FAILBACK_RESPONSE;
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
deleted file mode 100644
index beac2b5..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication.message;
-
-import java.util.Set;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.messaging.api.INcAddressedMessage;
-import org.apache.asterix.common.replication.INCLifecycleMessage;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class ReplayPartitionLogsRequestMessage implements INCLifecycleMessage, INcAddressedMessage {
-
- private static final Logger LOGGER = LogManager.getLogger();
- private static final long serialVersionUID = 1L;
- private final Set<Integer> partitions;
-
- public ReplayPartitionLogsRequestMessage(Set<Integer> partitions) {
- this.partitions = partitions;
- }
-
- @Override
- public void handle(INcApplicationContext appContext) throws HyracksDataException, InterruptedException {
- NodeControllerService ncs = (NodeControllerService) appContext.getServiceContext().getControllerService();
- // Replay the logs for these partitions and flush any impacted dataset
- appContext.getRemoteRecoveryManager().replayReplicaPartitionLogs(partitions, true);
-
- INCMessageBroker broker = (INCMessageBroker) ncs.getContext().getMessageBroker();
- ReplayPartitionLogsResponseMessage reponse = new ReplayPartitionLogsResponseMessage(ncs.getId(), partitions);
- try {
- broker.sendMessageToCC(reponse);
- } catch (Exception e) {
- LOGGER.log(Level.ERROR, "Failed sending message to cc", e);
- throw HyracksDataException.create(e);
- }
- }
-
- @Override
- public MessageType getType() {
- return MessageType.REPLAY_LOGS_REQUEST;
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java
deleted file mode 100644
index e05fd47..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication.message;
-
-import java.util.Set;
-
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
-import org.apache.asterix.common.replication.INCLifecycleMessage;
-import org.apache.asterix.runtime.utils.CcApplicationContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class ReplayPartitionLogsResponseMessage implements INCLifecycleMessage, ICcAddressedMessage {
-
- private static final long serialVersionUID = 1L;
- private final Set<Integer> partitions;
- private final String nodeId;
-
- public ReplayPartitionLogsResponseMessage(String nodeId, Set<Integer> partitions) {
- this.partitions = partitions;
- this.nodeId = nodeId;
- }
-
- @Override
- public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
- ((CcApplicationContext) appCtx).getFaultToleranceStrategy().process(this);
- }
-
- public Set<Integer> getPartitions() {
- return partitions;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- @Override
- public MessageType getType() {
- return MessageType.REPLAY_LOGS_RESPONSE;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
deleted file mode 100644
index 86be516..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication.message;
-
-import java.io.IOException;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.messaging.api.INcAddressedMessage;
-import org.apache.asterix.common.replication.INCLifecycleMessage;
-import org.apache.asterix.common.replication.IRemoteRecoveryManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class TakeoverPartitionsRequestMessage implements INCLifecycleMessage, INcAddressedMessage {
-
- private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = LogManager.getLogger();
- private final Integer[] partitions;
- private final long requestId;
- private final String nodeId;
-
- public TakeoverPartitionsRequestMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
- this.requestId = requestId;
- this.nodeId = nodeId;
- this.partitions = partitionsToTakeover;
- }
-
- public Integer[] getPartitions() {
- return partitions;
- }
-
- public long getRequestId() {
- return requestId;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(TakeoverPartitionsRequestMessage.class.getSimpleName());
- sb.append(" Request ID: " + requestId);
- sb.append(" Node ID: " + nodeId);
- sb.append(" Partitions: ");
- for (Integer partitionId : partitions) {
- sb.append(partitionId + ",");
- }
- //remove last comma
- sb.charAt(sb.length() - 1);
- return sb.toString();
- }
-
- @Override
- public void handle(INcApplicationContext appContext) throws HyracksDataException, InterruptedException {
- INCMessageBroker broker = (INCMessageBroker) appContext.getServiceContext().getMessageBroker();
- //if the NC is shutting down, it should ignore takeover partitions request
- if (!appContext.isShuttingdown()) {
- HyracksDataException hde = null;
- try {
- IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
- remoteRecoeryManager.takeoverPartitons(partitions);
- } catch (IOException | ACIDException e) {
- LOGGER.log(Level.ERROR, "Failure taking over partitions", e);
- hde = HyracksDataException.suppress(hde, e);
- } finally {
- //send response after takeover is completed
- TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(requestId,
- appContext.getTransactionSubsystem().getId(), partitions);
- try {
- broker.sendMessageToCC(reponse);
- } catch (Exception e) {
- LOGGER.log(Level.ERROR, "Failure taking over partitions", e);
- hde = HyracksDataException.suppress(hde, e);
- }
- }
- if (hde != null) {
- throw hde;
- }
- }
- }
-
- @Override
- public MessageType getType() {
- return MessageType.TAKEOVER_PARTITION_REQUEST;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsResponseMessage.java
deleted file mode 100644
index d9484f9..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsResponseMessage.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication.message;
-
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
-import org.apache.asterix.common.replication.INCLifecycleMessage;
-import org.apache.asterix.runtime.utils.CcApplicationContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class TakeoverPartitionsResponseMessage implements INCLifecycleMessage, ICcAddressedMessage {
-
- private static final long serialVersionUID = 1L;
- private final Integer[] partitions;
- private final String nodeId;
- private final long requestId;
-
- public TakeoverPartitionsResponseMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
- this.requestId = requestId;
- this.nodeId = nodeId;
- this.partitions = partitionsToTakeover;
- }
-
- public Integer[] getPartitions() {
- return partitions;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- public long getRequestId() {
- return requestId;
- }
-
- @Override
- public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
- ((CcApplicationContext) appCtx).getFaultToleranceStrategy().process(this);
- }
-
- @Override
- public String toString() {
- return TakeoverPartitionsResponseMessage.class.getSimpleName();
- }
-
- @Override
- public MessageType getType() {
- return MessageType.TAKEOVER_PARTITION_RESPONSE;
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 622e28f..c2a0fc1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -142,9 +142,8 @@ public class CCApplication extends BaseCCApplication {
ILibraryManager libraryManager = new ExternalLibraryManager();
ReplicationProperties repProp = new ReplicationProperties(
PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
- IReplicationStrategy repStrategy = ReplicationStrategyFactory.create(repProp.getReplicationStrategy(), repProp,
- getConfigManager());
- IFaultToleranceStrategy ftStrategy = FaultToleranceStrategyFactory.create(ccServiceCtx, repProp, repStrategy);
+ IFaultToleranceStrategy ftStrategy =
+ FaultToleranceStrategyFactory.create(ccServiceCtx, repProp.isReplicationEnabled());
ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
componentProvider = new StorageComponentProvider();
GlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
deleted file mode 100644
index ddd0967..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.util;
-
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.common.replication.IReplicationStrategy;
-import org.apache.asterix.common.replication.Replica;
-import org.apache.asterix.runtime.message.ReplicaEventMessage;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
-import org.apache.hyracks.api.config.IOption;
-import org.apache.hyracks.control.common.controllers.NCConfig;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class FaultToleranceUtil {
-
- private static final Logger LOGGER = LogManager.getLogger();
-
- private FaultToleranceUtil() {
- throw new AssertionError();
- }
-
- public static void notifyImpactedReplicas(String nodeId, ClusterEventType event,
- IClusterStateManager clusterManager, ICCMessageBroker messageBroker,
- IReplicationStrategy replicationStrategy) {
- List<String> primaryRemoteReplicas = replicationStrategy.getRemotePrimaryReplicas(nodeId).stream()
- .map(Replica::getId).collect(Collectors.toList());
- String nodeIdAddress = StringUtils.EMPTY;
- int nodePort = -1;
- Map<String, Map<IOption, Object>> activeNcConfiguration = clusterManager.getActiveNcConfiguration();
-
- // In case the node joined with a new IP address, we need to send it to the other replicas
- if (event == ClusterEventType.NODE_JOIN) {
- nodeIdAddress = (String) activeNcConfiguration.get(nodeId).get(NCConfig.Option.REPLICATION_PUBLIC_ADDRESS);
- nodePort = (int) activeNcConfiguration.get(nodeId).get(NCConfig.Option.REPLICATION_PUBLIC_PORT);
- }
- ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, nodePort, event);
- for (String replica : primaryRemoteReplicas) {
- // If the remote replica is alive, send the event
- if (activeNcConfiguration.containsKey(replica)) {
- try {
- messageBroker.sendApplicationMessageToNC(msg, replica);
- } catch (Exception e) {
- LOGGER.warn("Failed sending an application message to an NC", e);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/resources/cc-rep.conf
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/resources/cc-rep.conf b/asterixdb/asterix-app/src/main/resources/cc-rep.conf
index 9c093dc..bfca677 100644
--- a/asterixdb/asterix-app/src/main/resources/cc-rep.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc-rep.conf
@@ -48,5 +48,4 @@ heartbeat.period=2000
[common]
log.level = INFO
replication.enabled=true
-replication.strategy=metadata_only
-replication.factor=2
+replication.strategy=all
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.1.sto.cmd
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.1.sto.cmd b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.1.sto.cmd
new file mode 100644
index 0000000..1947749
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.1.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc2 /addReplica 2 asterix_nc1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.10.post.http
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.10.post.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.10.post.http
new file mode 100644
index 0000000..a3ea801
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.10.post.http
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/admin/cluster/partition/master?partition=3&node=asterix_nc1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.11.pollget.http
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.11.pollget.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.11.pollget.http
new file mode 100644
index 0000000..32e2f78
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.11.pollget.http
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//polltimeoutsecs=30
+
+/admin/cluster/summary
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.12.query.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.12.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.12.query.sqlpp
new file mode 100644
index 0000000..cd777cf
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.12.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+select value count(*) from LineItem;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.13.sto.cmd
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.13.sto.cmd b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.13.sto.cmd
new file mode 100644
index 0000000..1e192f4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.13.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc2 /removeReplica 2 asterix_nc1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.14.sto.cmd
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.14.sto.cmd b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.14.sto.cmd
new file mode 100644
index 0000000..530432f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.14.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc2 /removeReplica 3 asterix_nc1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.2.sto.cmd
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.2.sto.cmd b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.2.sto.cmd
new file mode 100644
index 0000000..f3810f8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.2.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc2 /addReplica 3 asterix_nc1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.3.pollget.http
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.3.pollget.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.3.pollget.http
new file mode 100644
index 0000000..4ea16d7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.3.pollget.http
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//polltimeoutsecs=30
+
+nc:asterix_nc2 /admin/storage/partition/2
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.4.pollget.http
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.4.pollget.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.4.pollget.http
new file mode 100644
index 0000000..22558bc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.4.pollget.http
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//polltimeoutsecs=30
+
+nc:asterix_nc2 /admin/storage/partition/3
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.5.ddl.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.5.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.5.ddl.sqlpp
new file mode 100644
index 0000000..071a40f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.5.ddl.sqlpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpch if exists;
+create dataverse tpch;
+
+use tpch;
+
+create type tpch.LineItemType as
+ closed {
+ l_orderkey : bigint,
+ l_partkey : bigint,
+ l_suppkey : bigint,
+ l_linenumber : bigint,
+ l_quantity : double,
+ l_extendedprice : double,
+ l_discount : double,
+ l_tax : double,
+ l_returnflag : string,
+ l_linestatus : string,
+ l_shipdate : string,
+ l_commitdate : string,
+ l_receiptdate : string,
+ l_shipinstruct : string,
+ l_shipmode : string,
+ l_comment : string
+};
+
+create dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
+
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.6.update.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.6.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.6.update.sqlpp
new file mode 100644
index 0000000..e53f462
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.6.update.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+load dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),
+ (`format`=`delimited-text`),(`delimiter`=`|`));
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.7.sto.cmd
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.7.sto.cmd b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.7.sto.cmd
new file mode 100644
index 0000000..389cf68
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.7.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc1 /promote 2 asterix_nc1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.8.sto.cmd
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.8.sto.cmd b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.8.sto.cmd
new file mode 100644
index 0000000..257f26a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.8.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc1 /promote 3 asterix_nc1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.9.post.http
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.9.post.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.9.post.http
new file mode 100644
index 0000000..36e1d00
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.9.post.http
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/admin/cluster/partition/master?partition=2&node=asterix_nc1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.1.sto.cmd
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.1.sto.cmd b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.1.sto.cmd
new file mode 100644
index 0000000..7ddaa20
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.1.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc1 /addReplica 0 asterix_nc2
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.10.sto.cmd
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.10.sto.cmd b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.10.sto.cmd
new file mode 100644
index 0000000..71621ac
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.10.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc1 /removeReplica 0 asterix_nc2
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.2.pollget.http
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.2.pollget.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.2.pollget.http
new file mode 100644
index 0000000..6867a5d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.2.pollget.http
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//polltimeoutsecs=30
+
+nc:asterix_nc1 /admin/storage/partition/0
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.3.ddl.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.3.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.3.ddl.sqlpp
new file mode 100644
index 0000000..15bc3c5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.3.ddl.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+CREATE TYPE MyType AS {
+ id : int
+};
+
+CREATE DATASET ds_1(MyType) PRIMARY KEY id;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.4.get.http
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.4.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.4.get.http
new file mode 100644
index 0000000..09ddf42
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.4.get.http
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/connector?dataverseName=Metadata&datasetName=Dataset
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.5.sto.cmd
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.5.sto.cmd b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.5.sto.cmd
new file mode 100644
index 0000000..a5753f0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.5.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc2 /promote 0 asterix_nc2
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.6.post.http
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.6.post.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.6.post.http
new file mode 100644
index 0000000..2e8fc63
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.6.post.http
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/admin/cluster/partition/master?partition=0&node=asterix_nc2
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.7.post.http
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.7.post.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.7.post.http
new file mode 100644
index 0000000..e8dca0b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.7.post.http
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/admin/cluster/metadataNode?node=asterix_nc2
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.8.pollget.http
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.8.pollget.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.8.pollget.http
new file mode 100644
index 0000000..32e2f78
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.8.pollget.http
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//polltimeoutsecs=30
+
+/admin/cluster/summary
\ No newline at end of file