You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2019/05/17 00:38:44 UTC

[asterixdb] branch master updated: [NO ISSUE][*DB][CLUS] Add support for a rebalance required cluster state

This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b637db0  [NO ISSUE][*DB][CLUS] Add support for a rebalance required cluster state
b637db0 is described below

commit b637db0c6ce9442cd25673fe8505fdd555f02b82
Author: Michael Blow <mb...@apache.org>
AuthorDate: Mon May 13 18:47:22 2019 -0400

    [NO ISSUE][*DB][CLUS] Add support for a rebalance required cluster state
    
    Allow extensions to mandate that a rebalance is required in order for
    the cluster to go active
    
    Change-Id: I863f8bf3fe1ce8d59522c9a28a1283006ffa414c
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/3394
    Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
---
 .../asterix/common/api/IClusterManagementWork.java |  4 +-
 .../common/cluster/IClusterStateManager.java       | 15 ++++++
 .../asterix/runtime/utils/ClusterStateManager.java | 58 ++++++++++++++--------
 3 files changed, 55 insertions(+), 22 deletions(-)

diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
index c2d3303..e13756c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
@@ -30,8 +30,8 @@ public interface IClusterManagementWork {
         PENDING, // the metadata node has not yet joined & initialized
         RECOVERING, // global recovery has not yet completed
         ACTIVE, // cluster is ACTIVE and ready for requests
-        REBALANCING, // replication is processing failbacks
-        SHUTTING_DOWN // a shutdown request has been received, and is underway
+        SHUTTING_DOWN, // a shutdown request has been received, and is underway
+        REBALANCE_REQUIRED // one or more datasets require rebalance before the cluster is usable
     }
 
     WorkType getClusterManagementWorkType();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 0e62851..6c39372 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -21,6 +21,7 @@ package org.apache.asterix.common.cluster;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -110,6 +111,14 @@ public interface IClusterStateManager {
             throws HyracksDataException, InterruptedException;
 
     /**
+     * Blocks until the cluster state matches supplied predicate, or timeout is exhausted.
+     *
+     * @return the cluster state matching the predicate if it was satisfied before timeout occurred, otherwise null
+     */
+    ClusterState waitForState(Predicate<ClusterState> condition, long timeout, TimeUnit unit)
+            throws HyracksDataException, InterruptedException;
+
+    /**
      * Register the specified node partitions with the specified nodeId with this cluster state manager
      * then calls {@link IClusterStateManager#refreshState()}
      *
@@ -250,4 +259,10 @@ public interface IClusterStateManager {
      * @return The metadata cluster partitions
      */
     ClusterPartition getMetadataPartition();
+
+    /**
+     * Indicate whether one or more datasets must be rebalanced before the cluster becomes ACTIVE
+     * @param rebalanceRequired
+     */
+    void setRebalanceRequired(boolean rebalanceRequired) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 16a479e..7933cd2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
@@ -75,6 +76,7 @@ public class ClusterStateManager implements IClusterStateManager {
     private INcLifecycleCoordinator lifecycleCoordinator;
     private ICcApplicationContext appCtx;
     private ClusterPartition metadataPartition;
+    private boolean rebalanceRequired;
 
     @Override
     public void setCcAppCtx(ICcApplicationContext appCtx) {
@@ -186,45 +188,55 @@ public class ClusterStateManager implements IClusterStateManager {
             return;
         }
         // the metadata bootstrap & global recovery must be complete before the cluster can be active
-        if (metadataNodeActive) {
-            if (state != ClusterState.ACTIVE && state != ClusterState.RECOVERING) {
-                setState(ClusterState.PENDING);
-            }
-            appCtx.getMetadataBootstrap().init();
-
-            if (appCtx.getGlobalRecoveryManager().isRecoveryCompleted()) {
-                setState(ClusterState.ACTIVE);
-            } else {
-                // start global recovery
-                setState(ClusterState.RECOVERING);
-                appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx);
-            }
-        } else {
+        if (!metadataNodeActive) {
             setState(ClusterState.PENDING);
+            return;
+        }
+        if (state != ClusterState.ACTIVE && state != ClusterState.RECOVERING) {
+            setState(ClusterState.PENDING);
+        }
+        appCtx.getMetadataBootstrap().init();
+
+        if (!appCtx.getGlobalRecoveryManager().isRecoveryCompleted()) {
+            // start global recovery
+            setState(ClusterState.RECOVERING);
+            appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx);
+            return;
         }
+        if (rebalanceRequired) {
+            setState(ClusterState.REBALANCE_REQUIRED);
+            return;
+        }
+        // finally- life is good, set the state to ACTIVE
+        setState(ClusterState.ACTIVE);
     }
 
     @Override
-    public synchronized void waitForState(ClusterState waitForState) throws HyracksDataException, InterruptedException {
+    public synchronized void waitForState(ClusterState waitForState) throws InterruptedException {
         while (state != waitForState) {
             wait();
         }
     }
 
     @Override
-    public synchronized boolean waitForState(ClusterState waitForState, long timeout, TimeUnit unit)
-            throws HyracksDataException, InterruptedException {
+    public boolean waitForState(ClusterState waitForState, long timeout, TimeUnit unit) throws InterruptedException {
+        return waitForState(waitForState::equals, timeout, unit) != null;
+    }
+
+    @Override
+    public synchronized ClusterState waitForState(Predicate<ClusterState> predicate, long timeout, TimeUnit unit)
+            throws InterruptedException {
         final long startMillis = System.currentTimeMillis();
         final long endMillis = startMillis + unit.toMillis(timeout);
-        while (state != waitForState) {
+        while (!predicate.test(state)) {
             long millisToSleep = endMillis - System.currentTimeMillis();
             if (millisToSleep > 0) {
                 wait(millisToSleep);
             } else {
-                return false;
+                return null;
             }
         }
-        return true;
+        return state;
     }
 
     @Override
@@ -458,6 +470,12 @@ public class ClusterStateManager implements IClusterStateManager {
         return metadataPartition;
     }
 
+    @Override
+    public synchronized void setRebalanceRequired(boolean rebalanceRequired) throws HyracksDataException {
+        this.rebalanceRequired = rebalanceRequired;
+        refreshState();
+    }
+
     private void updateClusterCounters(String nodeId, NcLocalCounters localCounters) {
         final IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
         resourceIdManager.report(nodeId, localCounters.getMaxResourceId());