You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:24 UTC

[helix] 21/37: Implement AssignmentMetadataStore (#453)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 3d2da32bddc7798a502ae170873f0fb1abe8cbd5
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Mon Sep 9 16:40:24 2019 -0700

    Implement AssignmentMetadataStore (#453)
    
    Implement AssignmentMetadataStore
    
    AssignmentMetadataStore is a component for the new WAGED Rebalaner. It provides APIs that allows the rebalancer to read and write the baseline and best possible assignments using BucketDataAccessor.
    
    Changelist:
    1. Add AssignmentMetadataStore
    2. Add an integration test: TestAssignmentMetadataStore
---
 .../rebalancer/waged/AssignmentMetadataStore.java  | 112 +++++++++++++++++++--
 .../rebalancer/waged/WagedRebalancer.java          |  64 ++++++------
 .../manager/zk/ZNRecordJacksonSerializer.java      |   4 +-
 .../waged/MockAssignmentMetadataStore.java         |   9 +-
 .../waged/TestAssignmentMetadataStore.java         | 101 +++++++++++++++++++
 5 files changed, 244 insertions(+), 46 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index cc52dac..bf9f292 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -19,6 +19,16 @@ package org.apache.helix.controller.rebalancer.waged;
  * under the License.
  */
 
+import java.io.IOException;
+import java.util.Arrays;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.BucketDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordJacksonSerializer;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
 import org.apache.helix.model.ResourceAssignment;
 
 import java.util.HashMap;
@@ -28,24 +38,106 @@ import java.util.Map;
  * A placeholder before we have the real assignment metadata store.
  */
 public class AssignmentMetadataStore {
-  private Map<String, ResourceAssignment> _persistGlobalBaseline = new HashMap<>();
-  private Map<String, ResourceAssignment> _persistBestPossibleAssignment = new HashMap<>();
+  private static final String ASSIGNMENT_METADATA_KEY = "ASSIGNMENT_METADATA";
+  private static final String BASELINE_TEMPLATE = "/%s/%s/BASELINE";
+  private static final String BEST_POSSIBLE_TEMPLATE = "/%s/%s/BEST_POSSIBLE";
+  private static final String BASELINE_KEY = "BASELINE";
+  private static final String BEST_POSSIBLE_KEY = "BEST_POSSIBLE";
+  private static final ZkSerializer SERIALIZER = new ZNRecordJacksonSerializer();
+
+  private BucketDataAccessor _dataAccessor;
+  private String _baselinePath;
+  private String _bestPossiblePath;
+  private Map<String, ResourceAssignment> _globalBaseline;
+  private Map<String, ResourceAssignment> _bestPossibleAssignment;
+
+  AssignmentMetadataStore(HelixManager helixManager) {
+    _dataAccessor = new ZkBucketDataAccessor(helixManager.getMetadataStoreConnectionString());
+    _baselinePath =
+        String.format(BASELINE_TEMPLATE, helixManager.getClusterName(), ASSIGNMENT_METADATA_KEY);
+    _bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, helixManager.getClusterName(),
+        ASSIGNMENT_METADATA_KEY);
+  }
 
   public Map<String, ResourceAssignment> getBaseline() {
-    return _persistGlobalBaseline;
+    // Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK
+    if (_globalBaseline == null) {
+      HelixProperty baseline =
+          _dataAccessor.compressedBucketRead(_baselinePath, HelixProperty.class);
+      _globalBaseline = splitAssignments(baseline);
+    }
+    return _globalBaseline;
+  }
+
+  public Map<String, ResourceAssignment> getBestPossibleAssignment() {
+    // Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK
+    if (_bestPossibleAssignment == null) {
+      HelixProperty baseline =
+          _dataAccessor.compressedBucketRead(_bestPossiblePath, HelixProperty.class);
+      _bestPossibleAssignment = splitAssignments(baseline);
+    }
+    return _bestPossibleAssignment;
   }
 
   public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
-    // TODO clean up invalid items
-    _persistGlobalBaseline = globalBaseline;
+    // TODO: Make the write async?
+    // Persist to ZK
+    HelixProperty combinedAssignments = combineAssignments(BASELINE_KEY, globalBaseline);
+    try {
+      _dataAccessor.compressedBucketWrite(_baselinePath, combinedAssignments);
+    } catch (IOException e) {
+      // TODO: Improve failure handling
+      throw new HelixException("Failed to persist baseline!", e);
+    }
+
+    // Update the in-memory reference
+    _globalBaseline = globalBaseline;
   }
 
-  public Map<String, ResourceAssignment> getBestPossibleAssignment() {
-    return _persistBestPossibleAssignment;
+  public void persistBestPossibleAssignment(
+      Map<String, ResourceAssignment> bestPossibleAssignment) {
+    // TODO: Make the write async?
+    // Persist to ZK asynchronously
+    HelixProperty combinedAssignments =
+        combineAssignments(BEST_POSSIBLE_KEY, bestPossibleAssignment);
+    try {
+      _dataAccessor.compressedBucketWrite(_bestPossiblePath, combinedAssignments);
+    } catch (IOException e) {
+      // TODO: Improve failure handling
+      throw new HelixException("Failed to persist baseline!", e);
+    }
+
+    // Update the in-memory reference
+    _bestPossibleAssignment = bestPossibleAssignment;
+  }
+
+  /**
+   * Produces one HelixProperty that contains all assignment data.
+   * @param name
+   * @param assignmentMap
+   * @return
+   */
+  private HelixProperty combineAssignments(String name,
+      Map<String, ResourceAssignment> assignmentMap) {
+    HelixProperty property = new HelixProperty(name);
+    // Add each resource's assignment as a simple field in one ZNRecord
+    assignmentMap.forEach((resource, assignment) -> property.getRecord().setSimpleField(resource,
+        Arrays.toString(SERIALIZER.serialize(assignment.getRecord()))));
+    return property;
   }
 
-  public void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment) {
-    // TODO clean up invalid items
-    _persistBestPossibleAssignment.putAll(bestPossibleAssignment);
+  /**
+   * Returns a Map of (ResourceName, ResourceAssignment) pairs.
+   * @param property
+   * @return
+   */
+  private Map<String, ResourceAssignment> splitAssignments(HelixProperty property) {
+    Map<String, ResourceAssignment> assignmentMap = new HashMap<>();
+    // Convert each resource's assignment String into a ResourceAssignment object and put it in a
+    // map
+    property.getRecord().getSimpleFields()
+        .forEach((resource, assignment) -> assignmentMap.put(resource,
+            new ResourceAssignment((ZNRecord) SERIALIZER.deserialize(assignment.getBytes()))));
+    return assignmentMap;
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 866c7c9..22cac7e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -51,10 +51,10 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Weight-Aware Globally-Even Distribute Rebalancer.
- *
- * @see <a href="https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer">
- * Design Document
- * </a>
+ * @see <a
+ *      href="https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer">
+ *      Design Document
+ *      </a>
  */
 public class WagedRebalancer {
   private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class);
@@ -62,8 +62,8 @@ public class WagedRebalancer {
   // When any of the following change happens, the rebalancer needs to do a global rebalance which
   // contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
   private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
-      Collections.unmodifiableSet(new HashSet<>(Arrays
-          .asList(HelixConstants.ChangeType.RESOURCE_CONFIG,
+      Collections
+          .unmodifiableSet(new HashSet<>(Arrays.asList(HelixConstants.ChangeType.RESOURCE_CONFIG,
               HelixConstants.ChangeType.CLUSTER_CONFIG,
               HelixConstants.ChangeType.INSTANCE_CONFIG)));
   // The cluster change detector is a stateful object.
@@ -73,7 +73,8 @@ public class WagedRebalancer {
   private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
 
   // --------- The following fields are placeholders and need replacement. -----------//
-  // TODO Shall we make the metadata store a static threadlocal object as well to avoid reinitialization?
+  // TODO Shall we make the metadata store a static threadlocal object as well to avoid
+  // reinitialization?
   private final AssignmentMetadataStore _assignmentMetadataStore;
   private final RebalanceAlgorithm _rebalanceAlgorithm;
   // ------------------------------------------------------------------------------------//
@@ -81,8 +82,8 @@ public class WagedRebalancer {
   public WagedRebalancer(HelixManager helixManager) {
     this(
         // TODO init the metadata store according to their requirement when integrate,
-        //  or change to final static method if possible.
-        new AssignmentMetadataStore(),
+        // or change to final static method if possible.
+        new AssignmentMetadataStore(helixManager),
         // TODO parse the cluster setting
         ConstraintBasedAlgorithmFactory.getInstance(),
         // Use DelayedAutoRebalancer as the mapping calculator for the final assignment output.
@@ -103,14 +104,14 @@ public class WagedRebalancer {
   protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
       RebalanceAlgorithm algorithm) {
     this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer());
+
   }
 
   /**
    * Compute the new IdealStates for all the input resources. The IdealStates include both new
    * partition assignment (in the listFiles) and the new replica state mapping (in the mapFields).
-   *
-   * @param clusterData        The Cluster status data provider.
-   * @param resourceMap        A map containing all the rebalancing resources.
+   * @param clusterData The Cluster status data provider.
+   * @param resourceMap A map containing all the rebalancing resources.
    * @param currentStateOutput The present Current States of the resources.
    * @return A map of the new IdealStates with the resource name as key.
    */
@@ -124,8 +125,8 @@ public class WagedRebalancer {
       IdealState is = clusterData.getIdealState(resourceEntry.getKey());
       return is != null && is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
           && getClass().getName().equals(is.getRebalancerClassName());
-    }).collect(Collectors
-        .toMap(resourceEntry -> resourceEntry.getKey(), resourceEntry -> resourceEntry.getValue()));
+    }).collect(Collectors.toMap(resourceEntry -> resourceEntry.getKey(),
+        resourceEntry -> resourceEntry.getValue()));
 
     if (resourceMap.isEmpty()) {
       LOG.warn("There is no valid resource to be rebalanced by {}",
@@ -140,13 +141,13 @@ public class WagedRebalancer {
     Map<String, IdealState> newIdealStates = computeBestPossibleStates(clusterData, resourceMap);
 
     // Construct the new best possible states according to the current state and target assignment.
-    // Note that the new ideal state might be an intermediate state between the current state and the target assignment.
+    // Note that the new ideal state might be an intermediate state between the current state and
+    // the target assignment.
     for (IdealState is : newIdealStates.values()) {
       String resourceName = is.getResourceName();
       // Adjust the states according to the current state.
-      ResourceAssignment finalAssignment = _mappingCalculator
-          .computeBestPossiblePartitionState(clusterData, is, resourceMap.get(resourceName),
-              currentStateOutput);
+      ResourceAssignment finalAssignment = _mappingCalculator.computeBestPossiblePartitionState(
+          clusterData, is, resourceMap.get(resourceName), currentStateOutput);
 
       // Clean up the state mapping fields. Use the final assignment that is calculated by the
       // mapping calculator to replace them.
@@ -195,10 +196,10 @@ public class WagedRebalancer {
       IdealState newIdeaState;
       try {
         IdealState currentIdealState = clusterData.getIdealState(resourceName);
-        Map<String, Integer> statePriorityMap =
-            clusterData.getStateModelDef(currentIdealState.getStateModelDefRef())
-                .getStatePriorityMap();
-        // Create a new IdealState instance contains the new calculated assignment in the preference list.
+        Map<String, Integer> statePriorityMap = clusterData
+            .getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap();
+        // Create a new IdealState instance contains the new calculated assignment in the preference
+        // list.
         newIdeaState = generateIdealStateWithAssignment(resourceName, currentIdealState,
             newAssignment.get(resourceName), statePriorityMap);
       } catch (Exception ex) {
@@ -227,9 +228,8 @@ public class WagedRebalancer {
       throw new HelixRebalanceException("Failed to get the current baseline assignment.",
           HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
     }
-    Map<String, ResourceAssignment> baseline =
-        calculateAssignment(clusterData, clusterChanges, resourceMap, clusterData.getAllInstances(),
-            Collections.emptyMap(), currentBaseline);
+    Map<String, ResourceAssignment> baseline = calculateAssignment(clusterData, clusterChanges,
+        resourceMap, clusterData.getAllInstances(), Collections.emptyMap(), currentBaseline);
     try {
       _assignmentMetadataStore.persistBaseline(baseline);
     } catch (Exception ex) {
@@ -254,9 +254,8 @@ public class WagedRebalancer {
       throw new HelixRebalanceException("Failed to get the persisted assignment records.",
           HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
     }
-    Map<String, ResourceAssignment> newAssignment =
-        calculateAssignment(clusterData, clusterChanges, resourceMap, activeInstances, baseline,
-            prevBestPossibleAssignment);
+    Map<String, ResourceAssignment> newAssignment = calculateAssignment(clusterData, clusterChanges,
+        resourceMap, activeInstances, baseline, prevBestPossibleAssignment);
     try {
       // TODO Test to confirm if persisting the final assignment (with final partition states)
       // would be a better option.
@@ -271,13 +270,13 @@ public class WagedRebalancer {
 
   /**
    * Generate the cluster model based on the input and calculate the optimal assignment.
-   *
    * @param clusterData                the cluster data cache.
    * @param clusterChanges             the detected cluster changes.
    * @param resourceMap                the rebalancing resources.
    * @param activeNodes                the alive and enabled nodes.
    * @param baseline                   the baseline assignment for the algorithm as a reference.
-   * @param prevBestPossibleAssignment the previous best possible assignment for the algorithm as a reference.
+   * @param prevBestPossibleAssignment the previous best possible assignment for the algorithm as a
+   *                                   reference.
    * @return the new optimal assignment for the resources.
    */
   private Map<String, ResourceAssignment> calculateAssignment(
@@ -289,9 +288,8 @@ public class WagedRebalancer {
     LOG.info("Start calculating for an assignment");
     ClusterModel clusterModel;
     try {
-      clusterModel = ClusterModelProvider
-          .generateClusterModel(clusterData, resourceMap, activeNodes, clusterChanges, baseline,
-              prevBestPossibleAssignment);
+      clusterModel = ClusterModelProvider.generateClusterModel(clusterData, resourceMap,
+          activeNodes, clusterChanges, baseline, prevBestPossibleAssignment);
     } catch (Exception ex) {
       throw new HelixRebalanceException("Failed to generate cluster model.",
           HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java
index 989017a..b375e80 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java
@@ -27,8 +27,8 @@ import org.apache.helix.ZNRecord;
 import org.codehaus.jackson.map.ObjectMapper;
 
 /**
- * ZNRecordJacksonSerializer serializes ZNRecord objects into a byte array using MessagePack's
- * serializer. Note that this serializer doesn't check for the size of the resulting binary.
+ * ZNRecordJacksonSerializer serializes ZNRecord objects into a byte array using Jackson. Note that
+ * this serializer doesn't check for the size of the resulting binary.
  */
 public class ZNRecordJacksonSerializer implements ZkSerializer {
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
index ea8c164..8b80f2d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
@@ -19,6 +19,7 @@ package org.apache.helix.controller.rebalancer.waged;
  * under the License.
  */
 
+import org.apache.helix.HelixManager;
 import org.apache.helix.model.ResourceAssignment;
 
 import java.util.HashMap;
@@ -32,6 +33,11 @@ public class MockAssignmentMetadataStore extends AssignmentMetadataStore {
   private Map<String, ResourceAssignment> _persistGlobalBaseline = new HashMap<>();
   private Map<String, ResourceAssignment> _persistBestPossibleAssignment = new HashMap<>();
 
+  public MockAssignmentMetadataStore() {
+    // In-memory mock component, so pass null for HelixManager since it's not needed
+    super(null);
+  }
+
   public Map<String, ResourceAssignment> getBaseline() {
     return _persistGlobalBaseline;
   }
@@ -44,7 +50,8 @@ public class MockAssignmentMetadataStore extends AssignmentMetadataStore {
     return _persistBestPossibleAssignment;
   }
 
-  public void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment) {
+  public void persistBestPossibleAssignment(
+      Map<String, ResourceAssignment> bestPossibleAssignment) {
     _persistBestPossibleAssignment = bestPossibleAssignment;
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
new file mode 100644
index 0000000..922915f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
@@ -0,0 +1,101 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ResourceAssignment;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestAssignmentMetadataStore extends ZkTestBase {
+  protected static final int NODE_NR = 5;
+  protected static final int START_PORT = 12918;
+  protected static final String STATE_MODEL = "MasterSlave";
+  protected static final String TEST_DB = "TestDB";
+  protected static final int _PARTITIONS = 20;
+
+  protected HelixManager _manager;
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+  protected MockParticipantManager[] _participants = new MockParticipantManager[NODE_NR];
+  protected ClusterControllerManager _controller;
+  protected int _replica = 3;
+
+  private AssignmentMetadataStore _store;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+
+    // setup storage cluster
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL);
+    for (int i = 0; i < NODE_NR; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, _replica);
+
+    // start dummy participants
+    for (int i = 0; i < NODE_NR; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      _participants[i].syncStart();
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // create cluster manager
+    _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
+        InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    // create AssignmentMetadataStore
+    _store = new AssignmentMetadataStore(_manager);
+  }
+
+  /**
+   * TODO: Reading baseline will be empty because AssignmentMetadataStore isn't being used yet by
+   * the new rebalancer. Modify this integration test once the WAGED rebalancer
+   * starts using AssignmentMetadataStore's persist APIs.
+   * TODO: WAGED Rebalancer currently does NOT work with ZKClusterVerifier because verifier's
+   * HelixManager is null, and that causes an NPE when instantiating AssignmentMetadataStore.
+   */
+  @Test
+  public void testReadEmptyBaseline() {
+    try {
+      Map<String, ResourceAssignment> baseline = _store.getBaseline();
+      Assert.fail("Should fail because there shouldn't be any data.");
+    } catch (Exception e) {
+      // OK
+    }
+  }
+}