You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2018/01/05 16:10:37 UTC

[09/10] asterixdb git commit: [ASTERIXDB-2195][REPL] Replace Static Replication

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
index 3c5442e..49f4b32 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
@@ -32,6 +32,9 @@ import org.apache.asterix.app.nc.task.LocalRecoveryTask;
 import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
 import org.apache.asterix.app.nc.task.ReportLocalCountersTask;
 import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
+import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
+import org.apache.asterix.app.replication.message.MetadataNodeRequestMessage;
+import org.apache.asterix.app.replication.message.MetadataNodeResponseMessage;
 import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
 import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
 import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
@@ -43,8 +46,8 @@ import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
-import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.metadata.MetadataManager;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -59,6 +62,7 @@ public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
     private String metadataNodeId;
     private Set<String> pendingStartupCompletionNodes = new HashSet<>();
     private ICCMessageBroker messageBroker;
+    private boolean replicationEnabled;
 
     @Override
     public void notifyNodeJoin(String nodeId) throws HyracksDataException {
@@ -84,15 +88,19 @@ public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
             case REGISTRATION_TASKS_RESULT:
                 process((NCLifecycleTaskReportMessage) message);
                 break;
+            case METADATA_NODE_RESPONSE:
+                process((MetadataNodeResponseMessage) message);
+                break;
             default:
                 throw new RuntimeDataException(ErrorCode.UNSUPPORTED_MESSAGE_TYPE, message.getType().name());
         }
     }
 
     @Override
-    public IFaultToleranceStrategy from(ICCServiceContext serviceCtx, IReplicationStrategy replicationStrategy) {
+    public IFaultToleranceStrategy from(ICCServiceContext serviceCtx, boolean replicationEnabled) {
         NoFaultToleranceStrategy ft = new NoFaultToleranceStrategy();
         ft.messageBroker = (ICCMessageBroker) serviceCtx.getMessageBroker();
+        ft.replicationEnabled = replicationEnabled;
         return ft;
     }
 
@@ -141,10 +149,14 @@ public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
         final List<INCLifecycleTask> tasks = new ArrayList<>();
         if (state == SystemState.CORRUPTED) {
             //need to perform local recovery for node partitions
-            LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.asList(clusterManager.getNodePartitions(nodeId))
-                    .stream().map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
+            LocalRecoveryTask rt = new LocalRecoveryTask(
+                    Arrays.asList(clusterManager.getNodePartitions(nodeId)).stream()
+                            .map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
             tasks.add(rt);
         }
+        if (replicationEnabled) {
+            tasks.add(new StartReplicationServiceTask());
+        }
         if (isMetadataNode) {
             tasks.add(new MetadataBootstrapTask());
         }
@@ -168,4 +180,42 @@ public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
         tasks.add(new ReportLocalCountersTask());
         return tasks;
     }
+
+    @Override
+    public void notifyMetadataNodeChange(String node) throws HyracksDataException {
+        if (metadataNodeId.equals(node)) {
+            return;
+        }
+        // if current metadata node is active, we need to unbind its metadata proxy object
+        if (clusterManager.isMetadataNodeActive()) {
+            MetadataNodeRequestMessage msg = new MetadataNodeRequestMessage(false);
+            try {
+                messageBroker.sendApplicationMessageToNC(msg, metadataNodeId);
+                // when the current node responses, we will bind to the new one
+                metadataNodeId = node;
+            } catch (Exception e) {
+                throw HyracksDataException.create(e);
+            }
+        } else {
+            requestMetadataNodeTakeover(node);
+        }
+    }
+
+    private void process(MetadataNodeResponseMessage response) throws HyracksDataException {
+        // rebind metadata node since it might be changing
+        MetadataManager.INSTANCE.rebindMetadataNode();
+        clusterManager.updateMetadataNode(response.getNodeId(), response.isExported());
+        if (!response.isExported()) {
+            requestMetadataNodeTakeover(metadataNodeId);
+        }
+    }
+
+    private void requestMetadataNodeTakeover(String node) throws HyracksDataException {
+        MetadataNodeRequestMessage msg = new MetadataNodeRequestMessage(true);
+        try {
+            messageBroker.sendApplicationMessageToNC(msg, node);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java
deleted file mode 100644
index ab938d2..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.asterix.app.replication.message.CompleteFailbackRequestMessage;
-import org.apache.asterix.app.replication.message.PreparePartitionsFailbackRequestMessage;
-import org.apache.asterix.app.replication.message.PreparePartitionsFailbackResponseMessage;
-
-public class NodeFailbackPlan {
-
-    public enum FailbackPlanState {
-        /**
-         * Initial state while selecting the nodes that will participate
-         * in the node failback plan.
-         */
-        PREPARING,
-        /**
-         * Once a pending {@link PreparePartitionsFailbackRequestMessage} request is added,
-         * the state is changed from PREPARING to PENDING_PARTICIPANT_REPONSE to indicate
-         * a response is expected and need to wait for it.
-         */
-        PENDING_PARTICIPANT_REPONSE,
-        /**
-         * Upon receiving the last {@link PreparePartitionsFailbackResponseMessage} response,
-         * the state changes from PENDING_PARTICIPANT_REPONSE to PENDING_COMPLETION to indicate
-         * the need to send {@link CompleteFailbackRequestMessage} to the failing back node.
-         */
-        PENDING_COMPLETION,
-        /**
-         * if any of the participants fail or the failing back node itself fails during
-         * and of these states (PREPARING, PENDING_PARTICIPANT_REPONSE, PENDING_COMPLETION),
-         * the state is changed to FAILED.
-         */
-        FAILED,
-        /**
-         * if the state is FAILED, and all pending responses (if any) have been received,
-         * the state changes from FAILED to PENDING_ROLLBACK to indicate the need to revert
-         * the effects of this plan (if any).
-         */
-        PENDING_ROLLBACK
-    }
-
-    private static long planIdGenerator = 0;
-    private long planId;
-    private final String nodeId;
-    private final Set<String> participants;
-    private final Map<Integer, String> partition2nodeMap;
-    private String nodeToReleaseMetadataManager;
-    private int requestId;
-    private Map<Integer, PreparePartitionsFailbackRequestMessage> pendingRequests;
-    private FailbackPlanState state;
-
-    public static NodeFailbackPlan createPlan(String nodeId) {
-        return new NodeFailbackPlan(planIdGenerator++, nodeId);
-    }
-
-    private NodeFailbackPlan(long planId, String nodeId) {
-        this.planId = planId;
-        this.nodeId = nodeId;
-        participants = new HashSet<>();
-        partition2nodeMap = new HashMap<>();
-        pendingRequests = new HashMap<>();
-        state = FailbackPlanState.PREPARING;
-    }
-
-    public synchronized void addPartitionToFailback(int partitionId, String currentActiveNode) {
-        partition2nodeMap.put(partitionId, currentActiveNode);
-    }
-
-    public synchronized void addParticipant(String nodeId) {
-        participants.add(nodeId);
-    }
-
-    public synchronized void notifyNodeFailure(String failedNode) {
-        if (participants.contains(failedNode)) {
-            if (state == FailbackPlanState.PREPARING) {
-                state = FailbackPlanState.FAILED;
-            } else if (state == FailbackPlanState.PENDING_PARTICIPANT_REPONSE) {
-                /**
-                 * if there is any pending request from this failed node,
-                 * it should be marked as completed and the plan should be marked as failed
-                 */
-                Set<Integer> failedRequests = new HashSet<>();
-                for (PreparePartitionsFailbackRequestMessage request : pendingRequests.values()) {
-                    if (request.getNodeID().equals(failedNode)) {
-                        failedRequests.add(request.getRequestId());
-                    }
-                }
-
-                if (!failedRequests.isEmpty()) {
-                    state = FailbackPlanState.FAILED;
-                    for (Integer failedRequestId : failedRequests) {
-                        markRequestCompleted(failedRequestId);
-                    }
-                }
-            }
-        } else if (nodeId.equals(failedNode)) {
-            //if the failing back node is the failed node itself
-            state = FailbackPlanState.FAILED;
-            updateState();
-        }
-    }
-
-    public synchronized Set<Integer> getPartitionsToFailback() {
-        return new HashSet<>(partition2nodeMap.keySet());
-    }
-
-    public synchronized void addPendingRequest(PreparePartitionsFailbackRequestMessage msg) {
-        //if this is the first request
-        if (pendingRequests.size() == 0) {
-            state = FailbackPlanState.PENDING_PARTICIPANT_REPONSE;
-        }
-        pendingRequests.put(msg.getRequestId(), msg);
-    }
-
-    public synchronized void markRequestCompleted(int requestId) {
-        pendingRequests.remove(requestId);
-        updateState();
-    }
-
-    private void updateState() {
-        if (pendingRequests.size() == 0) {
-            switch (state) {
-                case PREPARING:
-                case FAILED:
-                    state = FailbackPlanState.PENDING_ROLLBACK;
-                    break;
-                case PENDING_PARTICIPANT_REPONSE:
-                    state = FailbackPlanState.PENDING_COMPLETION;
-                    break;
-                default:
-                    break;
-            }
-        }
-    }
-
-    public synchronized Set<PreparePartitionsFailbackRequestMessage> getPlanFailbackRequests() {
-        Set<PreparePartitionsFailbackRequestMessage> node2Partitions = new HashSet<>();
-        /**
-         * for each participant, construct a request with the partitions
-         * that will be failed back or flushed.
-         */
-        for (String participant : participants) {
-            Set<Integer> partitionToPrepareForFailback = new HashSet<>();
-            partition2nodeMap.forEach((key, value) -> {
-                if (value.equals(participant)) {
-                    partitionToPrepareForFailback.add(key);
-                }
-            });
-            PreparePartitionsFailbackRequestMessage msg = new PreparePartitionsFailbackRequestMessage(planId,
-                    requestId++, participant, partitionToPrepareForFailback);
-            if (participant.equals(nodeToReleaseMetadataManager)) {
-                msg.setReleaseMetadataNode(true);
-            }
-            node2Partitions.add(msg);
-        }
-        return node2Partitions;
-    }
-
-    public synchronized CompleteFailbackRequestMessage getCompleteFailbackRequestMessage() {
-        return new CompleteFailbackRequestMessage(planId, requestId++, nodeId, getPartitionsToFailback());
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    public long getPlanId() {
-        return planId;
-    }
-
-    public void setNodeToReleaseMetadataManager(String nodeToReleaseMetadataManager) {
-        this.nodeToReleaseMetadataManager = nodeToReleaseMetadataManager;
-    }
-
-    public synchronized FailbackPlanState getState() {
-        return state;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("Plan ID: " + planId);
-        sb.append(" Failing back node: " + nodeId);
-        sb.append(" Participants: " + participants);
-        sb.append(" Partitions to Failback: " + partition2nodeMap.keySet());
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
deleted file mode 100644
index ad4afd0..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication.message;
-
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.messaging.api.INcAddressedMessage;
-import org.apache.asterix.common.replication.IRemoteRecoveryManager;
-import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage implements INcAddressedMessage {
-
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = LogManager.getLogger();
-    private final Set<Integer> partitions;
-    private final String nodeId;
-
-    public CompleteFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) {
-        super(planId, requestId);
-        this.nodeId = nodeId;
-        this.partitions = partitions;
-    }
-
-    public Set<Integer> getPartitions() {
-        return partitions;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append(CompleteFailbackRequestMessage.class.getSimpleName());
-        sb.append(" Plan ID: " + planId);
-        sb.append(" Node ID: " + nodeId);
-        sb.append(" Partitions: " + partitions);
-        return sb.toString();
-    }
-
-    @Override
-    public void handle(INcApplicationContext appContext) throws HyracksDataException, InterruptedException {
-        INCMessageBroker broker = (INCMessageBroker) appContext.getServiceContext().getMessageBroker();
-        HyracksDataException hde = null;
-        try {
-            IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
-            remoteRecoeryManager.completeFailbackProcess();
-        } catch (IOException | InterruptedException e) {
-            LOGGER.log(Level.ERROR, "Failure during completion of failback process", e);
-            hde = HyracksDataException.create(e);
-        } finally {
-            CompleteFailbackResponseMessage reponse =
-                    new CompleteFailbackResponseMessage(planId, requestId, partitions);
-            try {
-                broker.sendMessageToCC(reponse);
-            } catch (Exception e) {
-                LOGGER.log(Level.ERROR, "Failure sending message to CC", e);
-                hde = HyracksDataException.suppress(hde, e);
-            }
-        }
-        if (hde != null) {
-            throw hde;
-        }
-    }
-
-    @Override
-    public MessageType getType() {
-        return MessageType.COMPLETE_FAILBACK_REQUEST;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java
deleted file mode 100644
index 0c5678f..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication.message;
-
-import java.util.Set;
-
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
-import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage;
-import org.apache.asterix.runtime.utils.CcApplicationContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class CompleteFailbackResponseMessage extends AbstractFailbackPlanMessage implements ICcAddressedMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final Set<Integer> partitions;
-
-    public CompleteFailbackResponseMessage(long planId, int requestId, Set<Integer> partitions) {
-        super(planId, requestId);
-        this.partitions = partitions;
-    }
-
-    public Set<Integer> getPartitions() {
-        return partitions;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append(CompleteFailbackResponseMessage.class.getSimpleName());
-        sb.append(" Plan ID: " + planId);
-        sb.append(" Partitions: " + partitions);
-        return sb.toString();
-    }
-
-    @Override
-    public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
-        ((CcApplicationContext) appCtx).getFaultToleranceStrategy().process(this);
-    }
-
-    @Override
-    public MessageType getType() {
-        return MessageType.COMPLETE_FAILBACK_RESPONSE;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
deleted file mode 100644
index 6b85050..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication.message;
-
-import java.rmi.RemoteException;
-import java.util.Set;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.messaging.api.INcAddressedMessage;
-import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class PreparePartitionsFailbackRequestMessage extends AbstractFailbackPlanMessage
-        implements INcAddressedMessage {
-
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = LogManager.getLogger();
-    private final Set<Integer> partitions;
-    private boolean releaseMetadataNode = false;
-    private final String nodeID;
-
-    public PreparePartitionsFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) {
-        super(planId, requestId);
-        this.nodeID = nodeId;
-        this.partitions = partitions;
-    }
-
-    public Set<Integer> getPartitions() {
-        return partitions;
-    }
-
-    public boolean isReleaseMetadataNode() {
-        return releaseMetadataNode;
-    }
-
-    public void setReleaseMetadataNode(boolean releaseMetadataNode) {
-        this.releaseMetadataNode = releaseMetadataNode;
-    }
-
-    public String getNodeID() {
-        return nodeID;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append(PreparePartitionsFailbackRequestMessage.class.getSimpleName());
-        sb.append(" Plan ID: " + planId);
-        sb.append(" Partitions: " + partitions);
-        sb.append(" releaseMetadataNode: " + releaseMetadataNode);
-        return sb.toString();
-    }
-
-    @Override
-    public void handle(INcApplicationContext appContext) throws HyracksDataException, InterruptedException {
-        INCMessageBroker broker = (INCMessageBroker) appContext.getServiceContext().getMessageBroker();
-        /**
-         * if the metadata partition will be failed back
-         * we need to flush and close all datasets including metadata datasets
-         * otherwise we need to close all non-metadata datasets and flush metadata datasets
-         * so that their memory components will be copied to the failing back node
-         */
-        if (releaseMetadataNode) {
-            appContext.getDatasetLifecycleManager().closeAllDatasets();
-            //remove the metadata node stub from RMI registry
-            try {
-                appContext.unexportMetadataNodeStub();
-            } catch (RemoteException e) {
-                LOGGER.log(Level.ERROR, "Failed unexporting metadata stub", e);
-                throw HyracksDataException.create(e);
-            }
-        } else {
-            //close all non-metadata datasets
-            appContext.getDatasetLifecycleManager().closeUserDatasets();
-            //flush the remaining metadata datasets that were not closed
-            appContext.getDatasetLifecycleManager().flushAllDatasets();
-        }
-
-        //mark the partitions to be closed as inactive
-        PersistentLocalResourceRepository localResourceRepo =
-                (PersistentLocalResourceRepository) appContext.getLocalResourceRepository();
-        for (Integer partitionId : partitions) {
-            localResourceRepo.addInactivePartition(partitionId);
-        }
-
-        //send response after partitions prepared for failback
-        PreparePartitionsFailbackResponseMessage reponse =
-                new PreparePartitionsFailbackResponseMessage(planId, requestId, partitions);
-        try {
-            broker.sendMessageToCC(reponse);
-        } catch (Exception e) {
-            LOGGER.log(Level.ERROR, "Failed sending message to cc", e);
-            throw HyracksDataException.create(e);
-        }
-    }
-
-    @Override
-    public MessageType getType() {
-        return MessageType.PREPARE_FAILBACK_REQUEST;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java
deleted file mode 100644
index bea1039..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication.message;
-
-import java.util.Set;
-
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
-import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage;
-import org.apache.asterix.runtime.utils.CcApplicationContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class PreparePartitionsFailbackResponseMessage extends AbstractFailbackPlanMessage
-        implements ICcAddressedMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final Set<Integer> partitions;
-
-    public PreparePartitionsFailbackResponseMessage(long planId, int requestId, Set<Integer> partitions) {
-        super(planId, requestId);
-        this.partitions = partitions;
-    }
-
-    public Set<Integer> getPartitions() {
-        return partitions;
-    }
-
-    @Override
-    public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
-        ((CcApplicationContext) appCtx).getFaultToleranceStrategy().process(this);
-    }
-
-    @Override
-    public String toString() {
-        return PreparePartitionsFailbackResponseMessage.class.getSimpleName() + " " + partitions.toString();
-    }
-
-    @Override
-    public MessageType getType() {
-        return MessageType.PREPARE_FAILBACK_RESPONSE;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
deleted file mode 100644
index beac2b5..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication.message;
-
-import java.util.Set;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.messaging.api.INcAddressedMessage;
-import org.apache.asterix.common.replication.INCLifecycleMessage;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class ReplayPartitionLogsRequestMessage implements INCLifecycleMessage, INcAddressedMessage {
-
-    private static final Logger LOGGER = LogManager.getLogger();
-    private static final long serialVersionUID = 1L;
-    private final Set<Integer> partitions;
-
-    public ReplayPartitionLogsRequestMessage(Set<Integer> partitions) {
-        this.partitions = partitions;
-    }
-
-    @Override
-    public void handle(INcApplicationContext appContext) throws HyracksDataException, InterruptedException {
-        NodeControllerService ncs = (NodeControllerService) appContext.getServiceContext().getControllerService();
-        // Replay the logs for these partitions and flush any impacted dataset
-        appContext.getRemoteRecoveryManager().replayReplicaPartitionLogs(partitions, true);
-
-        INCMessageBroker broker = (INCMessageBroker) ncs.getContext().getMessageBroker();
-        ReplayPartitionLogsResponseMessage reponse = new ReplayPartitionLogsResponseMessage(ncs.getId(), partitions);
-        try {
-            broker.sendMessageToCC(reponse);
-        } catch (Exception e) {
-            LOGGER.log(Level.ERROR, "Failed sending message to cc", e);
-            throw HyracksDataException.create(e);
-        }
-    }
-
-    @Override
-    public MessageType getType() {
-        return MessageType.REPLAY_LOGS_REQUEST;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java
deleted file mode 100644
index e05fd47..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication.message;
-
-import java.util.Set;
-
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
-import org.apache.asterix.common.replication.INCLifecycleMessage;
-import org.apache.asterix.runtime.utils.CcApplicationContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class ReplayPartitionLogsResponseMessage implements INCLifecycleMessage, ICcAddressedMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final Set<Integer> partitions;
-    private final String nodeId;
-
-    public ReplayPartitionLogsResponseMessage(String nodeId, Set<Integer> partitions) {
-        this.partitions = partitions;
-        this.nodeId = nodeId;
-    }
-
-    @Override
-    public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
-        ((CcApplicationContext) appCtx).getFaultToleranceStrategy().process(this);
-    }
-
-    public Set<Integer> getPartitions() {
-        return partitions;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    @Override
-    public MessageType getType() {
-        return MessageType.REPLAY_LOGS_RESPONSE;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
deleted file mode 100644
index 86be516..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication.message;
-
-import java.io.IOException;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.messaging.api.INcAddressedMessage;
-import org.apache.asterix.common.replication.INCLifecycleMessage;
-import org.apache.asterix.common.replication.IRemoteRecoveryManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class TakeoverPartitionsRequestMessage implements INCLifecycleMessage, INcAddressedMessage {
-
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = LogManager.getLogger();
-    private final Integer[] partitions;
-    private final long requestId;
-    private final String nodeId;
-
-    public TakeoverPartitionsRequestMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
-        this.requestId = requestId;
-        this.nodeId = nodeId;
-        this.partitions = partitionsToTakeover;
-    }
-
-    public Integer[] getPartitions() {
-        return partitions;
-    }
-
-    public long getRequestId() {
-        return requestId;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append(TakeoverPartitionsRequestMessage.class.getSimpleName());
-        sb.append(" Request ID: " + requestId);
-        sb.append(" Node ID: " + nodeId);
-        sb.append(" Partitions: ");
-        for (Integer partitionId : partitions) {
-            sb.append(partitionId + ",");
-        }
-        //remove last comma
-        sb.charAt(sb.length() - 1);
-        return sb.toString();
-    }
-
-    @Override
-    public void handle(INcApplicationContext appContext) throws HyracksDataException, InterruptedException {
-        INCMessageBroker broker = (INCMessageBroker) appContext.getServiceContext().getMessageBroker();
-        //if the NC is shutting down, it should ignore takeover partitions request
-        if (!appContext.isShuttingdown()) {
-            HyracksDataException hde = null;
-            try {
-                IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
-                remoteRecoeryManager.takeoverPartitons(partitions);
-            } catch (IOException | ACIDException e) {
-                LOGGER.log(Level.ERROR, "Failure taking over partitions", e);
-                hde = HyracksDataException.suppress(hde, e);
-            } finally {
-                //send response after takeover is completed
-                TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(requestId,
-                        appContext.getTransactionSubsystem().getId(), partitions);
-                try {
-                    broker.sendMessageToCC(reponse);
-                } catch (Exception e) {
-                    LOGGER.log(Level.ERROR, "Failure taking over partitions", e);
-                    hde = HyracksDataException.suppress(hde, e);
-                }
-            }
-            if (hde != null) {
-                throw hde;
-            }
-        }
-    }
-
-    @Override
-    public MessageType getType() {
-        return MessageType.TAKEOVER_PARTITION_REQUEST;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsResponseMessage.java
deleted file mode 100644
index d9484f9..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsResponseMessage.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.replication.message;
-
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
-import org.apache.asterix.common.replication.INCLifecycleMessage;
-import org.apache.asterix.runtime.utils.CcApplicationContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class TakeoverPartitionsResponseMessage implements INCLifecycleMessage, ICcAddressedMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final Integer[] partitions;
-    private final String nodeId;
-    private final long requestId;
-
-    public TakeoverPartitionsResponseMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
-        this.requestId = requestId;
-        this.nodeId = nodeId;
-        this.partitions = partitionsToTakeover;
-    }
-
-    public Integer[] getPartitions() {
-        return partitions;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    public long getRequestId() {
-        return requestId;
-    }
-
-    @Override
-    public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
-        ((CcApplicationContext) appCtx).getFaultToleranceStrategy().process(this);
-    }
-
-    @Override
-    public String toString() {
-        return TakeoverPartitionsResponseMessage.class.getSimpleName();
-    }
-
-    @Override
-    public MessageType getType() {
-        return MessageType.TAKEOVER_PARTITION_RESPONSE;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 622e28f..c2a0fc1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -142,9 +142,8 @@ public class CCApplication extends BaseCCApplication {
         ILibraryManager libraryManager = new ExternalLibraryManager();
         ReplicationProperties repProp = new ReplicationProperties(
                 PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
-        IReplicationStrategy repStrategy = ReplicationStrategyFactory.create(repProp.getReplicationStrategy(), repProp,
-                getConfigManager());
-        IFaultToleranceStrategy ftStrategy = FaultToleranceStrategyFactory.create(ccServiceCtx, repProp, repStrategy);
+        IFaultToleranceStrategy ftStrategy =
+                FaultToleranceStrategyFactory.create(ccServiceCtx, repProp.isReplicationEnabled());
         ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
         componentProvider = new StorageComponentProvider();
         GlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
deleted file mode 100644
index ddd0967..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.util;
-
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.common.replication.IReplicationStrategy;
-import org.apache.asterix.common.replication.Replica;
-import org.apache.asterix.runtime.message.ReplicaEventMessage;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
-import org.apache.hyracks.api.config.IOption;
-import org.apache.hyracks.control.common.controllers.NCConfig;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class FaultToleranceUtil {
-
-    private static final Logger LOGGER = LogManager.getLogger();
-
-    private FaultToleranceUtil() {
-        throw new AssertionError();
-    }
-
-    public static void notifyImpactedReplicas(String nodeId, ClusterEventType event,
-            IClusterStateManager clusterManager, ICCMessageBroker messageBroker,
-            IReplicationStrategy replicationStrategy) {
-        List<String> primaryRemoteReplicas = replicationStrategy.getRemotePrimaryReplicas(nodeId).stream()
-                .map(Replica::getId).collect(Collectors.toList());
-        String nodeIdAddress = StringUtils.EMPTY;
-        int nodePort = -1;
-        Map<String, Map<IOption, Object>> activeNcConfiguration = clusterManager.getActiveNcConfiguration();
-
-        // In case the node joined with a new IP address, we need to send it to the other replicas
-        if (event == ClusterEventType.NODE_JOIN) {
-            nodeIdAddress = (String) activeNcConfiguration.get(nodeId).get(NCConfig.Option.REPLICATION_PUBLIC_ADDRESS);
-            nodePort = (int) activeNcConfiguration.get(nodeId).get(NCConfig.Option.REPLICATION_PUBLIC_PORT);
-        }
-        ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, nodePort, event);
-        for (String replica : primaryRemoteReplicas) {
-            // If the remote replica is alive, send the event
-            if (activeNcConfiguration.containsKey(replica)) {
-                try {
-                    messageBroker.sendApplicationMessageToNC(msg, replica);
-                } catch (Exception e) {
-                    LOGGER.warn("Failed sending an application message to an NC", e);
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/main/resources/cc-rep.conf
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/resources/cc-rep.conf b/asterixdb/asterix-app/src/main/resources/cc-rep.conf
index 9c093dc..bfca677 100644
--- a/asterixdb/asterix-app/src/main/resources/cc-rep.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc-rep.conf
@@ -48,5 +48,4 @@ heartbeat.period=2000
 [common]
 log.level = INFO
 replication.enabled=true
-replication.strategy=metadata_only
-replication.factor=2
+replication.strategy=all
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.1.sto.cmd
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.1.sto.cmd b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.1.sto.cmd
new file mode 100644
index 0000000..1947749
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.1.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc2 /addReplica 2 asterix_nc1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.10.post.http
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.10.post.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.10.post.http
new file mode 100644
index 0000000..a3ea801
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.10.post.http
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/admin/cluster/partition/master?partition=3&node=asterix_nc1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.11.pollget.http
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.11.pollget.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.11.pollget.http
new file mode 100644
index 0000000..32e2f78
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.11.pollget.http
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//polltimeoutsecs=30
+
+/admin/cluster/summary
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.12.query.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.12.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.12.query.sqlpp
new file mode 100644
index 0000000..cd777cf
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.12.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+select value count(*) from LineItem;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.13.sto.cmd
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.13.sto.cmd b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.13.sto.cmd
new file mode 100644
index 0000000..1e192f4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.13.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc2 /removeReplica 2 asterix_nc1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.14.sto.cmd
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.14.sto.cmd b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.14.sto.cmd
new file mode 100644
index 0000000..530432f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.14.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc2 /removeReplica 3 asterix_nc1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.2.sto.cmd
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.2.sto.cmd b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.2.sto.cmd
new file mode 100644
index 0000000..f3810f8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.2.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc2 /addReplica 3 asterix_nc1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.3.pollget.http
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.3.pollget.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.3.pollget.http
new file mode 100644
index 0000000..4ea16d7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.3.pollget.http
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//polltimeoutsecs=30
+
+nc:asterix_nc2 /admin/storage/partition/2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.4.pollget.http
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.4.pollget.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.4.pollget.http
new file mode 100644
index 0000000..22558bc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.4.pollget.http
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//polltimeoutsecs=30
+
+nc:asterix_nc2 /admin/storage/partition/3
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.5.ddl.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.5.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.5.ddl.sqlpp
new file mode 100644
index 0000000..071a40f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.5.ddl.sqlpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse tpch if exists;
+create  dataverse tpch;
+
+use tpch;
+
+create type tpch.LineItemType as
+ closed {
+  l_orderkey : bigint,
+  l_partkey : bigint,
+  l_suppkey : bigint,
+  l_linenumber : bigint,
+  l_quantity : double,
+  l_extendedprice : double,
+  l_discount : double,
+  l_tax : double,
+  l_returnflag : string,
+  l_linestatus : string,
+  l_shipdate : string,
+  l_commitdate : string,
+  l_receiptdate : string,
+  l_shipinstruct : string,
+  l_shipmode : string,
+  l_comment : string
+};
+
+create dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
+

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.6.update.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.6.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.6.update.sqlpp
new file mode 100644
index 0000000..e53f462
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.6.update.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+load  dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),
+      (`format`=`delimited-text`),(`delimiter`=`|`));
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.7.sto.cmd
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.7.sto.cmd b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.7.sto.cmd
new file mode 100644
index 0000000..389cf68
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.7.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc1 /promote 2 asterix_nc1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.8.sto.cmd
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.8.sto.cmd b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.8.sto.cmd
new file mode 100644
index 0000000..257f26a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.8.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc1 /promote 3 asterix_nc1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.9.post.http
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.9.post.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.9.post.http
new file mode 100644
index 0000000..36e1d00
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/bulkload/bulkload.9.post.http
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/admin/cluster/partition/master?partition=2&node=asterix_nc1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.1.sto.cmd
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.1.sto.cmd b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.1.sto.cmd
new file mode 100644
index 0000000..7ddaa20
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.1.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc1 /addReplica 0 asterix_nc2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.10.sto.cmd
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.10.sto.cmd b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.10.sto.cmd
new file mode 100644
index 0000000..71621ac
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.10.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc1 /removeReplica 0 asterix_nc2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.2.pollget.http
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.2.pollget.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.2.pollget.http
new file mode 100644
index 0000000..6867a5d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.2.pollget.http
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//polltimeoutsecs=30
+
+nc:asterix_nc1 /admin/storage/partition/0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.3.ddl.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.3.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.3.ddl.sqlpp
new file mode 100644
index 0000000..15bc3c5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.3.ddl.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+CREATE TYPE MyType AS {
+  id : int
+};
+
+CREATE DATASET ds_1(MyType) PRIMARY KEY id;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.4.get.http
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.4.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.4.get.http
new file mode 100644
index 0000000..09ddf42
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.4.get.http
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/connector?dataverseName=Metadata&datasetName=Dataset
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.5.sto.cmd
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.5.sto.cmd b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.5.sto.cmd
new file mode 100644
index 0000000..a5753f0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.5.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc2 /promote 0 asterix_nc2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.6.post.http
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.6.post.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.6.post.http
new file mode 100644
index 0000000..2e8fc63
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.6.post.http
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/admin/cluster/partition/master?partition=0&node=asterix_nc2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.7.post.http
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.7.post.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.7.post.http
new file mode 100644
index 0000000..e8dca0b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.7.post.http
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/admin/cluster/metadataNode?node=asterix_nc2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.8.pollget.http
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.8.pollget.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.8.pollget.http
new file mode 100644
index 0000000..32e2f78
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/flushed_component/flushed_component.8.pollget.http
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+//polltimeoutsecs=30
+
+/admin/cluster/summary
\ No newline at end of file