You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/04/23 19:27:51 UTC

[helix] 05/23: Implement Helix API for updating customized state (#729)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit f4e3a93d3a60a3fb243815f4c33b7d35ea6b85c5
Author: zhangmeng916 <56...@users.noreply.github.com>
AuthorDate: Wed Feb 26 10:05:55 2020 -0800

    Implement Helix API for updating customized state (#729)
    
    Implement Helix APIs in CustomizedStateProvider for customers to operate on their own customized state. The available operations include update, get, and delete. To use CustomizedStateProvider, Helix user should initialize its factory and pass required parameters.
---
 .../main/java/org/apache/helix/PropertyKey.java    |  26 ++++
 .../java/org/apache/helix/PropertyPathBuilder.java |  23 +++
 .../main/java/org/apache/helix/PropertyType.java   |   2 +
 .../customizedstate/CustomizedStateProvider.java   | 130 +++++++++++++++++
 .../CustomizedStateProviderFactory.java            |  79 +++++++++++
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |   1 +
 .../org/apache/helix/model/CustomizedState.java    | 155 +++++++++++++++++++++
 .../org/apache/helix/TestPropertyPathBuilder.java  |   5 +
 .../paticipant/TestCustomizedStateUpdate.java      | 152 ++++++++++++++++++++
 .../java/org/apache/helix/mock/MockHelixAdmin.java |   3 +
 10 files changed, 576 insertions(+)

diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 1df56ff..39f18c9 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -28,6 +28,7 @@ import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ControllerHistory;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.CustomizedStateAggregationConfig;
+import org.apache.helix.model.CustomizedState;
 import org.apache.helix.model.Error;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HealthStat;
@@ -53,6 +54,7 @@ import org.slf4j.LoggerFactory;
 import static org.apache.helix.PropertyType.CONFIGS;
 import static org.apache.helix.PropertyType.CONTROLLER;
 import static org.apache.helix.PropertyType.CURRENTSTATES;
+import static org.apache.helix.PropertyType.CUSTOMIZEDSTATES;
 import static org.apache.helix.PropertyType.ERRORS;
 import static org.apache.helix.PropertyType.ERRORS_CONTROLLER;
 import static org.apache.helix.PropertyType.EXTERNALVIEW;
@@ -481,6 +483,30 @@ public class PropertyKey {
     }
 
     /**
+     * Get a property key associated with {@link CustomizedState} of an instance and customized state
+     * @param instanceName
+     * @param customizedStateName
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey customizedStates(String instanceName, String customizedStateName) {
+      return new PropertyKey(CUSTOMIZEDSTATES, CustomizedState.class, _clusterName, instanceName,
+          customizedStateName);
+    }
+
+    /**
+     * Get a property key associated with {@link CustomizedState} of an instance, customized state, and resource
+     * @param instanceName
+     * @param customizedStateName
+     * @param resourceName
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey customizedState(String instanceName, String customizedStateName,
+        String resourceName) {
+      return new PropertyKey(CUSTOMIZEDSTATES, CustomizedState.class, _clusterName, instanceName,
+          customizedStateName, resourceName);
+    }
+
+    /**
      * Get a property key associated with {@link StatusUpdate} of an instance, session, resource,
      * and partition
      * @param instanceName
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
index 52bf9f7..48db47a 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
@@ -27,6 +27,7 @@ import java.util.regex.Pattern;
 
 import org.apache.helix.model.ControllerHistory;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.CustomizedState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.helix.PropertyType.CONFIGS;
 import static org.apache.helix.PropertyType.CURRENTSTATES;
+import static org.apache.helix.PropertyType.CUSTOMIZEDSTATES;
 import static org.apache.helix.PropertyType.EXTERNALVIEW;
 import static org.apache.helix.PropertyType.HISTORY;
 import static org.apache.helix.PropertyType.IDEALSTATES;
@@ -53,6 +55,7 @@ import static org.apache.helix.PropertyType.STATEMODELDEFS;
 import static org.apache.helix.PropertyType.STATUSUPDATES;
 import static org.apache.helix.PropertyType.WORKFLOWCONTEXT;
 
+
 /**
  * Utility mapping properties to their Zookeeper locations
  */
@@ -112,6 +115,12 @@ public class PropertyPathBuilder {
         "/{clusterName}/INSTANCES/{instanceName}/CURRENTSTATES/{sessionId}/{resourceName}");
     addEntry(PropertyType.CURRENTSTATES, 5,
         "/{clusterName}/INSTANCES/{instanceName}/CURRENTSTATES/{sessionId}/{resourceName}/{bucketName}");
+    addEntry(PropertyType.CUSTOMIZEDSTATES, 2,
+        "/{clusterName}/INSTANCES/{instanceName}/CUSTOMIZEDSTATES");
+    addEntry(PropertyType.CUSTOMIZEDSTATES, 3,
+        "/{clusterName}/INSTANCES/{instanceName}/CUSTOMIZEDSTATES/{customizedStateName}");
+    addEntry(PropertyType.CUSTOMIZEDSTATES, 4,
+        "/{clusterName}/INSTANCES/{instanceName}/CUSTOMIZEDSTATES/{customizedStateName}/{resourceName}");
     addEntry(PropertyType.STATUSUPDATES, 2,
         "/{clusterName}/INSTANCES/{instanceName}/STATUSUPDATES");
     addEntry(PropertyType.STATUSUPDATES, 3,
@@ -312,6 +321,20 @@ public class PropertyPathBuilder {
         sessionId, resourceName);
   }
 
+  public static String instanceCustomizedState(String clusterName, String instanceName) {
+    return String.format("/%s/INSTANCES/%s/CUSTOMIZEDSTATES", clusterName, instanceName);
+  }
+
+  public static String instanceCustomizedState(String clusterName, String instanceName,
+      String customizedStateName) {
+    return String.format("/%s/INSTANCES/%s/CUSTOMIZEDSTATES/%s", clusterName, instanceName, customizedStateName);
+  }
+
+  public static String instanceCustomizedState(String clusterName, String instanceName,
+      String customizedStateName, String resourceName) {
+    return String.format("/%s/INSTANCES/%s/CUSTOMIZEDSTATES/%s/%s", clusterName, instanceName,
+        customizedStateName, resourceName);
+  }
   public static String instanceError(String clusterName, String instanceName) {
     return String.format("/%s/INSTANCES/%s/ERRORS", clusterName, instanceName);
   }
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyType.java b/helix-core/src/main/java/org/apache/helix/PropertyType.java
index f879249..e076322 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyType.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyType.java
@@ -55,6 +55,8 @@ public enum PropertyType {
   ERRORS(Type.INSTANCE, true, true),
   INSTANCE_HISTORY(Type.INSTANCE, true, true, true),
   HEALTHREPORT(Type.INSTANCE, true, false, false, false, false, true),
+  CUSTOMIZEDSTATES(Type.INSTANCE, true, false, false, true, true),
+
 
   // CONTROLLER PROPERTY
   LEADER(Type.CONTROLLER, false, false, true, true),
diff --git a/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java b/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java
new file mode 100644
index 0000000..3807ea6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java
@@ -0,0 +1,130 @@
+package org.apache.helix.customizedstate;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.CustomizedState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for Helix customers to operate on customized state
+ */
+public class CustomizedStateProvider {
+  private static final Logger LOG = LoggerFactory.getLogger(CustomizedStateProvider.class);
+  private final HelixManager _helixManager;
+  private final HelixDataAccessor _helixDataAccessor;
+  private String _instanceName;
+
+  public CustomizedStateProvider(HelixManager helixManager, String instanceName) {
+    _helixManager = helixManager;
+    _instanceName = instanceName;
+    _helixDataAccessor = _helixManager.getHelixDataAccessor();
+  }
+
+  /**
+   * Update a specific customized state based on the resource name and partition name. The
+   * customized state is input as a single string
+   */
+  public synchronized void updateCustomizedState(String customizedStateName, String resourceName,
+      String partitionName, String customizedState) {
+    Map<String, String> customizedStateMap = new HashMap<>();
+    customizedStateMap.put(CustomizedState.CustomizedStateProperty.CURRENT_STATE.name(), customizedState);
+    updateCustomizedState(customizedStateName, resourceName, partitionName, customizedStateMap);
+  }
+
+  /**
+   * Update a specific customized state based on the resource name and partition name. The
+   * customized state is input as a map
+   */
+  public synchronized void updateCustomizedState(String customizedStateName, String resourceName,
+      String partitionName, Map<String, String> customizedStateMap) {
+    PropertyKey.Builder keyBuilder = _helixDataAccessor.keyBuilder();
+    PropertyKey propertyKey =
+        keyBuilder.customizedState(_instanceName, customizedStateName, resourceName);
+    ZNRecord record = new ZNRecord(resourceName);
+    Map<String, Map<String, String>> mapFields = new HashMap<>();
+    CustomizedState existingState = getCustomizedState(customizedStateName, resourceName);
+    if (existingState != null
+        && existingState.getRecord().getMapFields().containsKey(partitionName)) {
+      Map<String, String> existingMap = new HashMap<>();
+      for (String key : customizedStateMap.keySet()) {
+        existingMap.put(key, customizedStateMap.get(key));
+      }
+
+      mapFields.put(partitionName, existingMap);
+    } else {
+      mapFields.put(partitionName, customizedStateMap);
+    }
+    record.setMapFields(mapFields);
+    if (!_helixDataAccessor.updateProperty(propertyKey, new CustomizedState(record))) {
+      throw new HelixException(
+          String.format("Failed to persist customized state %s to zk for instance %s, resource %s",
+              customizedStateName, _instanceName, record.getId()));
+    }
+  }
+
+  /**
+   * Get the customized state for a specified resource
+   */
+  public CustomizedState getCustomizedState(String customizedStateName, String resourceName) {
+    HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    return (CustomizedState) accessor
+        .getProperty(keyBuilder.customizedState(_instanceName, customizedStateName, resourceName));
+  }
+
+  /**
+   * Get the customized state for a specified resource and a specified partition
+   */
+  public Map<String, String> getPerPartitionCustomizedState(String customizedStateName,
+      String resourceName, String partitionName) {
+    PropertyKey.Builder keyBuilder = _helixDataAccessor.keyBuilder();
+    Map<String, Map<String, String>> mapView = _helixDataAccessor
+        .getProperty(keyBuilder.customizedState(_instanceName, customizedStateName, resourceName))
+        .getRecord().getMapFields();
+    return mapView.get(partitionName);
+  }
+
+  /**
+   * Delete the customized state for a specified resource and a specified partition
+   */
+  public void deletePerPartitionCustomizedState(String customizedStateName, String resourceName,
+      String partitionName) {
+    PropertyKey.Builder keyBuilder = _helixDataAccessor.keyBuilder();
+    PropertyKey propertyKey =
+        keyBuilder.customizedState(_instanceName, customizedStateName, resourceName);
+    CustomizedState existingState = getCustomizedState(customizedStateName, resourceName);
+    _helixDataAccessor.updateProperty(propertyKey, new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord current) {
+        current.getMapFields().remove(partitionName);
+        return current;
+      }
+    }, existingState);
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProviderFactory.java b/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProviderFactory.java
new file mode 100644
index 0000000..3e60522
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProviderFactory.java
@@ -0,0 +1,79 @@
+package org.apache.helix.customizedstate;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Singleton factory that build customized state provider.
+ */
+public class CustomizedStateProviderFactory {
+  private static Logger LOG = LoggerFactory.getLogger(CustomizedStateProvider.class);
+  private final HashMap<String, CustomizedStateProvider> _customizedStateProviderMap =
+      new HashMap<>();
+  private HelixManager _helixManager;
+
+  protected CustomizedStateProviderFactory() {
+  }
+
+  private static class SingletonHelper {
+    private static final CustomizedStateProviderFactory INSTANCE =
+        new CustomizedStateProviderFactory();
+  }
+
+  public static CustomizedStateProviderFactory getInstance() {
+    return SingletonHelper.INSTANCE;
+  }
+
+  public CustomizedStateProvider buildCustomizedStateProvider(String instanceName) {
+    if (_helixManager == null) {
+      throw new HelixException("Helix Manager has not been set yet.");
+    }
+    return buildCustomizedStateProvider(_helixManager, instanceName);
+  }
+
+  /**
+   * Build a customized state provider based on the specified input. If the instance already has a
+   * provider, return it. Otherwise, build a new one and put it in the map.
+   * @param helixManager The helix manager that belongs to the instance
+   * @param instanceName The name of the instance
+   * @return CustomizedStateProvider
+   */
+  public CustomizedStateProvider buildCustomizedStateProvider(HelixManager helixManager,
+      String instanceName) {
+    synchronized (_customizedStateProviderMap) {
+      if (_customizedStateProviderMap.get(instanceName) != null) {
+        return _customizedStateProviderMap.get(instanceName);
+      }
+      CustomizedStateProvider customizedStateProvider =
+          new CustomizedStateProvider(helixManager, instanceName);
+      _customizedStateProviderMap.put(instanceName, customizedStateProvider);
+      return customizedStateProvider;
+    }
+  }
+
+  public void setHelixManager(HelixManager helixManager) {
+    _helixManager = helixManager;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 24e0a60..3eb4d55 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -189,6 +189,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     _zkClient.createPersistent(PropertyPathBuilder.instanceMessage(clusterName, nodeId), true);
     _zkClient.createPersistent(PropertyPathBuilder.instanceCurrentState(clusterName, nodeId), true);
+    _zkClient.createPersistent(PropertyPathBuilder.instanceCustomizedState(clusterName, nodeId), true);
     _zkClient.createPersistent(PropertyPathBuilder.instanceError(clusterName, nodeId), true);
     _zkClient.createPersistent(PropertyPathBuilder.instanceStatusUpdate(clusterName, nodeId), true);
     _zkClient.createPersistent(PropertyPathBuilder.instanceHistory(clusterName, nodeId), true);
diff --git a/helix-core/src/main/java/org/apache/helix/model/CustomizedState.java b/helix-core/src/main/java/org/apache/helix/model/CustomizedState.java
new file mode 100644
index 0000000..7b0b97d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/CustomizedState.java
@@ -0,0 +1,155 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Customized states of partitions in a resource for an instance.
+ */
+public class CustomizedState extends HelixProperty {
+  private static Logger LOG = LoggerFactory.getLogger(CustomizedState.class);
+
+  /**
+   * Lookup keys for the customized state
+   */
+  public enum CustomizedStateProperty {
+    PREVIOUS_STATE,
+    CURRENT_STATE,
+    START_TIME,
+    END_TIME
+  }
+
+  /**
+   * Instantiate a customized state with a resource
+   * @param resourceName name identifying the resource
+   */
+  public CustomizedState(String resourceName) {
+    super(resourceName);
+  }
+
+  /**
+   * Instantiate a customized state with a pre-populated ZNRecord
+   * @param record a ZNRecord corresponding to the customized state
+   */
+  public CustomizedState(ZNRecord record) {
+    super(record);
+  }
+
+  /**
+   * Get the name of the resource
+   * @return String resource identifier
+   */
+  public String getResourceName() {
+    return _record.getId();
+  }
+
+  /**
+   * Get the partitions on this instance and the specified property that each partition is currently having.
+   * @return (partition, property) pairs
+   */
+  public Map<String, String> getPartitionStateMap(CustomizedStateProperty property) {
+    Map<String, String> map = new HashMap<String, String>();
+    Map<String, Map<String, String>> mapFields = _record.getMapFields();
+    for (String partitionName : mapFields.keySet()) {
+      Map<String, String> tempMap = mapFields.get(partitionName);
+      if (tempMap != null) {
+        map.put(partitionName, tempMap.get(property.name()));
+      }
+    }
+    return map;
+  }
+
+  /**
+   * Get the state of a partition on this instance
+   * @param partitionName the name of the partition
+   * @return the state, or null if the partition is not present
+   */
+  public String getState(String partitionName) {
+    return getProperty(partitionName, CustomizedStateProperty.CURRENT_STATE);
+  }
+
+  public long getStartTime(String partitionName) {
+    String startTime = getProperty(partitionName, CustomizedStateProperty.START_TIME);
+    return startTime == null ? -1L : Long.parseLong(startTime);
+  }
+
+  public long getEndTime(String partitionName) {
+    String endTime = getProperty(partitionName, CustomizedStateProperty.END_TIME);
+    return endTime == null ? -1L : Long.parseLong(endTime);
+  }
+
+
+  public String getPreviousState(String partitionName) {
+    return getProperty(partitionName, CustomizedStateProperty.PREVIOUS_STATE);
+  }
+
+  private String getProperty(String partitionName, CustomizedStateProperty property) {
+    Map<String, String> mapField = _record.getMapField(partitionName);
+    if (mapField != null) {
+      return mapField.get(property.name());
+    }
+    return null;
+  }
+
+  /**
+   * Set the state that a partition is currently in on this instance
+   * @param partitionName the name of the partition
+   * @param state the state of the partition
+   */
+  public void setState(String partitionName, String state) {
+    setProperty(partitionName, CustomizedStateProperty.CURRENT_STATE, state);
+  }
+
+  public void setStartTime(String partitionName, long startTime) {
+    setProperty(partitionName, CustomizedStateProperty.START_TIME, String.valueOf(startTime));
+  }
+
+  public void setEndTime(String partitionName, long endTime) {
+    setProperty(partitionName, CustomizedStateProperty.END_TIME, String.valueOf(endTime));
+  }
+
+  public void setPreviousState(String partitionName, String state) {
+    setProperty(partitionName, CustomizedStateProperty.PREVIOUS_STATE, state);
+  }
+
+  private void setProperty(String partitionName, CustomizedStateProperty property, String value) {
+    Map<String, Map<String, String>> mapFields = _record.getMapFields();
+    mapFields.putIfAbsent(partitionName, new TreeMap<String, String>());
+    mapFields.get(partitionName).put(property.name(), value);
+  }
+
+  @Override
+  public boolean isValid() {
+    if (getPartitionStateMap(CustomizedStateProperty.CURRENT_STATE) == null) {
+      LOG.error("Customized state does not contain state map. id:" + getResourceName());
+      return false;
+    }
+    return true;
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java b/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java
index 0ff2e87..d414d54 100644
--- a/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java
+++ b/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java
@@ -40,6 +40,11 @@ public class TestPropertyPathBuilder {
     actual = PropertyPathBuilder.instanceCurrentState("test_cluster", "instanceName1", "sessionId");
     AssertJUnit.assertEquals(actual, "/test_cluster/INSTANCES/instanceName1/CURRENTSTATES/sessionId");
 
+    actual = PropertyPathBuilder.instanceCustomizedState("test_cluster", "instanceName1");
+    AssertJUnit.assertEquals(actual, "/test_cluster/INSTANCES/instanceName1/CUSTOMIZEDSTATES");
+    actual = PropertyPathBuilder.instanceCustomizedState("test_cluster", "instanceName1", "customizedState1");
+    AssertJUnit.assertEquals(actual, "/test_cluster/INSTANCES/instanceName1/CUSTOMIZEDSTATES/customizedState1");
+
     actual = PropertyPathBuilder.controller("test_cluster");
     AssertJUnit.assertEquals(actual, "/test_cluster/CONTROLLER");
     actual = PropertyPathBuilder.controllerMessage("test_cluster");
diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
new file mode 100644
index 0000000..e0553cd
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
@@ -0,0 +1,152 @@
+package org.apache.helix.integration.paticipant;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.customizedstate.CustomizedStateProvider;
+import org.apache.helix.customizedstate.CustomizedStateProviderFactory;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.model.CustomizedState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
+  private static Logger LOG = LoggerFactory.getLogger(TestCustomizedStateUpdate.class);
+  private final String CUSTOMIZE_STATE_NAME = "testState1";
+  private final String PARTITION_NAME1 = "testPartition1";
+  private final String PARTITION_NAME2 = "testPartition2";
+  private final String RESOURCE_NAME = "testResource1";
+
+  @Test
+  public void testUpdateCustomizedState() throws Exception {
+    HelixManager manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "admin",
+        InstanceType.ADMINISTRATOR, ZK_ADDR);
+    manager.connect();
+    _participants[0].connect();
+
+    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+    PropertyKey propertyKey = dataAccessor.keyBuilder()
+        .customizedStates(_participants[0].getInstanceName(), CUSTOMIZE_STATE_NAME);
+    CustomizedState customizedStates = manager.getHelixDataAccessor().getProperty(propertyKey);
+    Assert.assertNull(customizedStates);
+
+    CustomizedStateProvider mockProvider = CustomizedStateProviderFactory.getInstance()
+        .buildCustomizedStateProvider(manager, _participants[0].getInstanceName());
+
+    // test adding customized state for a partition
+    Map<String, String> customizedStateMap = new HashMap<>();
+    customizedStateMap.put("PREVIOUS_STATE", "STARTED");
+    customizedStateMap.put("CURRENT_STATE", "END_OF_PUSH_RECEIVED");
+    mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1,
+        customizedStateMap);
+
+    CustomizedState customizedState =
+        mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
+    Assert.assertNotNull(customizedState);
+    Assert.assertEquals(customizedState.getId(), RESOURCE_NAME);
+    Map<String, Map<String, String>> mapView = customizedState.getRecord().getMapFields();
+    Assert.assertEquals(mapView.keySet().size(), 1);
+    Assert.assertEquals(mapView.keySet().iterator().next(), PARTITION_NAME1);
+    Assert.assertEquals(mapView.get(PARTITION_NAME1).keySet().size(), 2);
+    Assert.assertEquals(mapView.get(PARTITION_NAME1).get("PREVIOUS_STATE"), "STARTED");
+    Assert.assertEquals(mapView.get(PARTITION_NAME1).get("CURRENT_STATE"), "END_OF_PUSH_RECEIVED");
+
+    // test partial update customized state for previous partition
+    Map<String, String> stateMap1 = new HashMap<>();
+    stateMap1.put("PREVIOUS_STATE", "END_OF_PUSH_RECEIVED");
+    mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1,
+        stateMap1);
+
+    customizedState = mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
+    Assert.assertNotNull(customizedState);
+    Assert.assertEquals(customizedState.getId(), RESOURCE_NAME);
+    mapView = customizedState.getRecord().getMapFields();
+    Assert.assertEquals(mapView.keySet().size(), 1);
+    Assert.assertEquals(mapView.keySet().iterator().next(), PARTITION_NAME1);
+    Assert.assertEquals(mapView.get(PARTITION_NAME1).keySet().size(), 2);
+    Assert.assertEquals(mapView.get(PARTITION_NAME1).get("PREVIOUS_STATE"), "END_OF_PUSH_RECEIVED");
+    Assert.assertEquals(mapView.get(PARTITION_NAME1).get("CURRENT_STATE"), "END_OF_PUSH_RECEIVED");
+
+    // test full update customized state for previous partition
+    stateMap1 = new HashMap<>();
+    stateMap1.put("PREVIOUS_STATE", "END_OF_PUSH_RECEIVED");
+    stateMap1.put("CURRENT_STATE", "COMPLETED");
+    mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1,
+        stateMap1);
+
+    customizedState = mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
+    Assert.assertNotNull(customizedState);
+    Assert.assertEquals(customizedState.getId(), RESOURCE_NAME);
+    mapView = customizedState.getRecord().getMapFields();
+    Assert.assertEquals(mapView.keySet().size(), 1);
+    Assert.assertEquals(mapView.keySet().iterator().next(), PARTITION_NAME1);
+    Assert.assertEquals(mapView.get(PARTITION_NAME1).keySet().size(), 2);
+    Assert.assertEquals(mapView.get(PARTITION_NAME1).get("PREVIOUS_STATE"), "END_OF_PUSH_RECEIVED");
+    Assert.assertEquals(mapView.get(PARTITION_NAME1).get("CURRENT_STATE"), "COMPLETED");
+
+    // test adding adding customized state for a new partition in the same resource
+    Map<String, String> stateMap2 = new HashMap<>();
+    stateMap2.put("PREVIOUS_STATE", "STARTED");
+    stateMap2.put("CURRENT_STATE", "END_OF_PUSH_RECEIVED");
+    mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2,
+        stateMap2);
+
+    customizedState = mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
+    Assert.assertNotNull(customizedState);
+    Assert.assertEquals(customizedState.getId(), RESOURCE_NAME);
+    mapView = customizedState.getRecord().getMapFields();
+    Assert.assertEquals(mapView.keySet().size(), 2);
+    Assert.assertEqualsNoOrder(mapView.keySet().toArray(), new String[] {
+        PARTITION_NAME1, PARTITION_NAME2
+    });
+
+    Map<String, String> partitionMap1 = mockProvider
+        .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1);
+    Assert.assertEquals(partitionMap1.keySet().size(), 2);
+    Assert.assertEquals(partitionMap1.get("PREVIOUS_STATE"), "END_OF_PUSH_RECEIVED");
+    Assert.assertEquals(partitionMap1.get("CURRENT_STATE"), "COMPLETED");
+
+    Map<String, String> partitionMap2 = mockProvider
+        .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2);
+    Assert.assertEquals(partitionMap2.keySet().size(), 2);
+    Assert.assertEquals(partitionMap2.get("PREVIOUS_STATE"), "STARTED");
+    Assert.assertEquals(partitionMap2.get("CURRENT_STATE"), "END_OF_PUSH_RECEIVED");
+
+    // test delete customized state for a partition
+    mockProvider.deletePerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME,
+        PARTITION_NAME1);
+    customizedState = mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
+    Assert.assertNotNull(customizedState);
+    Assert.assertEquals(customizedState.getId(), RESOURCE_NAME);
+    mapView = customizedState.getRecord().getMapFields();
+    Assert.assertEquals(mapView.keySet().size(), 1);
+    Assert.assertEquals(mapView.keySet().iterator().next(), PARTITION_NAME2);
+
+    manager.disconnect();
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 595e435..02bd639 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -217,6 +217,9 @@ public class MockHelixAdmin implements HelixAdmin {
         .set(PropertyPathBuilder.instanceCurrentState(clusterName, nodeId), new ZNRecord(nodeId),
             0);
     _baseDataAccessor
+        .set(PropertyPathBuilder.instanceCustomizedState(clusterName, nodeId), new ZNRecord(nodeId),
+            0);
+    _baseDataAccessor
         .set(PropertyPathBuilder.instanceError(clusterName, nodeId), new ZNRecord(nodeId), 0);
     _baseDataAccessor
         .set(PropertyPathBuilder.instanceStatusUpdate(clusterName, nodeId), new ZNRecord(nodeId),