You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/03/20 18:32:58 UTC
[helix] branch customizeView updated: Complete the Routing Table
Provider for CustomizedView (#834)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch customizeView
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/customizeView by this push:
new 3f8ebc3 Complete the Routing Table Provider for CustomizedView (#834)
3f8ebc3 is described below
commit 3f8ebc3a0bf2dbba4d01bdbea71bd339384e2ba0
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Fri Mar 20 11:32:47 2020 -0700
Complete the Routing Table Provider for CustomizedView (#834)
In this commit, the routing table provider has been changed in a way to include customized view feature.
---
.../main/java/org/apache/helix/HelixManager.java | 2 +-
.../main/java/org/apache/helix/PropertyKey.java | 12 +-
.../java/org/apache/helix/PropertyPathBuilder.java | 9 +-
.../helix/common/caches/CustomizedViewCache.java | 16 +-
.../apache/helix/manager/zk/ZKHelixManager.java | 4 +-
.../spectator/CustomizedViewRoutingTable.java | 112 ++++
.../apache/helix/spectator/RoutingDataCache.java | 94 +++-
.../org/apache/helix/spectator/RoutingTable.java | 52 +-
.../helix/spectator/RoutingTableProvider.java | 582 +++++++++++++++------
.../helix/spectator/RoutingTableSnapshot.java | 37 ++
.../controller/stages/DummyClusterManager.java | 2 +-
.../spectator/TestRoutingTableProvider.java | 109 ++++
.../TestRoutingTableProviderPeriodicRefresh.java | 26 +-
.../java/org/apache/helix/mock/MockManager.java | 2 +-
.../helix/participant/MockZKHelixManager.java | 2 +-
15 files changed, 837 insertions(+), 224 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java
index e4adbd0..5740196 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java
@@ -242,7 +242,7 @@ public interface HelixManager {
* @see CustomizedViewChangeListener#onCustomizedViewChange(List, NotificationContext)
* @param listener
*/
- void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String aggregationType) throws Exception;
+ void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String customizedStateType) throws Exception;
/**
* @see ExternalViewChangeListener#onExternalViewChange(List, NotificationContext)
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 608c037..57d5925 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -640,20 +640,20 @@ public class PropertyKey {
/**
* Get a property key associated with an {@link CustomizedView} of a type
- * @param aggregationType
+ * @param customizedStateType
* @return {@link PropertyKey}
*/
- public PropertyKey customizedView(String aggregationType) {
- return new PropertyKey(CUSTOMIZEDVIEW, CustomizedView.class, _clusterName, aggregationType);
+ public PropertyKey customizedView(String customizedStateType) {
+ return new PropertyKey(CUSTOMIZEDVIEW, CustomizedView.class, _clusterName, customizedStateType);
}
/**
* Get a property key associated with an {@link CustomizedView} of a type and resource
- * @param aggregationType
+ * @param customizedStateType
* @return {@link PropertyKey}
*/
- public PropertyKey customizedView(String aggregationType, String resourceName) {
- return new PropertyKey(CUSTOMIZEDVIEW, CustomizedView.class, _clusterName, aggregationType,
+ public PropertyKey customizedView(String customizedStateType, String resourceName) {
+ return new PropertyKey(CUSTOMIZEDVIEW, CustomizedView.class, _clusterName, customizedStateType,
resourceName);
}
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
index d9bb2b4..f39530e 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
@@ -27,6 +27,7 @@ import java.util.regex.Pattern;
import org.apache.helix.model.ControllerHistory;
import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.CustomizedView;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.helix.PropertyType.CONFIGS;
import static org.apache.helix.PropertyType.CURRENTSTATES;
+import static org.apache.helix.PropertyType.CUSTOMIZEDVIEW;
import static org.apache.helix.PropertyType.EXTERNALVIEW;
import static org.apache.helix.PropertyType.HISTORY;
import static org.apache.helix.PropertyType.IDEALSTATES;
@@ -70,6 +72,7 @@ public class PropertyPathBuilder {
typeToClassMapping.put(IDEALSTATES, IdealState.class);
typeToClassMapping.put(CONFIGS, InstanceConfig.class);
typeToClassMapping.put(EXTERNALVIEW, ExternalView.class);
+ typeToClassMapping.put(CUSTOMIZEDVIEW, CustomizedView.class);
typeToClassMapping.put(STATEMODELDEFS, StateModelDefinition.class);
typeToClassMapping.put(MESSAGES, Message.class);
typeToClassMapping.put(CURRENTSTATES, CurrentState.class);
@@ -94,6 +97,10 @@ public class PropertyPathBuilder {
addEntry(PropertyType.IDEALSTATES, 2, "/{clusterName}/IDEALSTATES/{resourceName}");
addEntry(PropertyType.EXTERNALVIEW, 1, "/{clusterName}/EXTERNALVIEW");
addEntry(PropertyType.EXTERNALVIEW, 2, "/{clusterName}/EXTERNALVIEW/{resourceName}");
+ addEntry(PropertyType.CUSTOMIZEDVIEW, 1, "/{clusterName}/CUSTOMIZEDVIEW");
+ addEntry(PropertyType.CUSTOMIZEDVIEW, 2, "/{clusterName}/CUSTOMIZEDVIEW/{customizedStateType}");
+ addEntry(PropertyType.CUSTOMIZEDVIEW, 3, "/{clusterName}/CUSTOMIZEDVIEW/{customizedStateType}/{resourceName}");
+
addEntry(PropertyType.TARGETEXTERNALVIEW, 1, "/{clusterName}/TARGETEXTERNALVIEW");
addEntry(PropertyType.TARGETEXTERNALVIEW, 2,
"/{clusterName}/TARGETEXTERNALVIEW/{resourceName}");
@@ -269,7 +276,7 @@ public class PropertyPathBuilder {
public static String externalView(String clusterName, String resourceName) {
return String.format("/%s/EXTERNALVIEW/%s", clusterName, resourceName);
}
-
+
public static String targetExternalView(String clusterName) {
return String.format("/%s/TARGETEXTERNALVIEW", clusterName);
}
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/CustomizedViewCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/CustomizedViewCache.java
index a718872..d46be64 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/CustomizedViewCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/CustomizedViewCache.java
@@ -47,19 +47,19 @@ public class CustomizedViewCache extends AbstractDataCache<CustomizedView> {
protected Map<String, CustomizedView> _customizedViewCache;
protected String _clusterName;
private PropertyType _propertyType;
- private String _aggregationType;
+ private String _customizedStateType;
- public CustomizedViewCache(String clusterName, String aggregationType) {
- this(clusterName, PropertyType.CUSTOMIZEDVIEW, aggregationType);
+ public CustomizedViewCache(String clusterName, String customizedStateType) {
+ this(clusterName, PropertyType.CUSTOMIZEDVIEW, customizedStateType);
}
- protected CustomizedViewCache(String clusterName, PropertyType propertyType, String aggregationType) {
+ protected CustomizedViewCache(String clusterName, PropertyType propertyType, String customizedStateType) {
super(createDefaultControlContextProvider(clusterName));
_clusterName = clusterName;
_customizedViewMap = Collections.emptyMap();
_customizedViewCache = Collections.emptyMap();
_propertyType = propertyType;
- _aggregationType = aggregationType;
+ _customizedStateType = customizedStateType;
}
@@ -105,14 +105,14 @@ public class CustomizedViewCache extends AbstractDataCache<CustomizedView> {
_customizedViewMap = new HashMap<>(newCustomizedViewMap);
long endTime = System.currentTimeMillis();
- LOG.info("Refresh " + _customizedViewMap.size() + " CustomizedViews of type " + _aggregationType
+ LOG.info("Refresh " + _customizedViewMap.size() + " CustomizedViews of type " + _customizedStateType
+ " for cluster " + _clusterName + ", took " + (endTime - startTime) + " ms");
}
private PropertyKey customizedViewsKey(PropertyKey.Builder keyBuilder) {
PropertyKey customizedViewPropertyKey;
if (_propertyType.equals(PropertyType.CUSTOMIZEDVIEW)){
- customizedViewPropertyKey = keyBuilder.customizedView(_aggregationType);
+ customizedViewPropertyKey = keyBuilder.customizedView(_customizedStateType);
} else {
throw new HelixException(
"Failed to refresh CustomizedViewCache, Wrong property type " + _propertyType + "!");
@@ -123,7 +123,7 @@ public class CustomizedViewCache extends AbstractDataCache<CustomizedView> {
private PropertyKey customizedViewKey(PropertyKey.Builder keyBuilder, String resource) {
PropertyKey customizedViewPropertyKey;
if (_propertyType.equals(PropertyType.CUSTOMIZEDVIEW)) {
- customizedViewPropertyKey = keyBuilder.customizedView(_aggregationType, resource);
+ customizedViewPropertyKey = keyBuilder.customizedView(_customizedStateType, resource);
} else {
throw new HelixException(
"Failed to refresh CustomizedViewCache, Wrong property type " + _propertyType + "!");
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 1fde7a4..7b7f33c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -584,9 +584,9 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
}
@Override
- public void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String aggregationType)
+ public void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String customizedStateType)
throws Exception {
- addListener(listener, new Builder(_clusterName).customizedView(aggregationType),
+ addListener(listener, new Builder(_clusterName).customizedView(customizedStateType),
ChangeType.CUSTOMIZED_VIEW, new EventType[] {
EventType.NodeChildrenChanged
});
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/CustomizedViewRoutingTable.java b/helix-core/src/main/java/org/apache/helix/spectator/CustomizedViewRoutingTable.java
new file mode 100644
index 0000000..5f986ce
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/spectator/CustomizedViewRoutingTable.java
@@ -0,0 +1,112 @@
+package org.apache.helix.spectator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import org.apache.helix.PropertyType;
+import org.apache.helix.model.CustomizedView;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CustomizedViewRoutingTable extends RoutingTable {
+ private static final Logger logger = LoggerFactory.getLogger(CustomizedViewRoutingTable.class);
+
+ private final Collection<CustomizedView> _customizedViews;
+ /*
+ * The customizedStateType field is the type the controller is aggregating.
+ * For example if RoutingTableProvider initialized using the code below:
+ * Map<PropertyType, List<String>> sourceDataTypes = new HashMap<>();
+ * sourceDataTypes.put(PropertyType.CUSTOMIZEDVIEW, Arrays.asList("typeA", "typeB"));
+ * RoutingTableProvider routingTableProvider =
+ * new RoutingTableProvider(_spectator, sourceDataTypes);
+ * Each one of the TypeA and TypeB is a customizedStateType.
+ */
+ private final String _customizedStateType;
+
+ public CustomizedViewRoutingTable(PropertyType propertyType, String customizedStateType) {
+ this(Collections.<CustomizedView> emptyList(), propertyType, customizedStateType);
+ }
+
+ protected CustomizedViewRoutingTable(Collection<CustomizedView> customizedViews,
+ PropertyType propertytype, String customizedStateType) {
+ this(customizedViews, Collections.<InstanceConfig> emptyList(),
+ Collections.<LiveInstance> emptyList(), propertytype, customizedStateType);
+ }
+
+ protected CustomizedViewRoutingTable(Collection<CustomizedView> customizedViews,
+ Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances,
+ PropertyType propertytype, String customizedStateType) {
+ super(Collections.<ExternalView> emptyList(), instanceConfigs, liveInstances,
+ PropertyType.CUSTOMIZEDVIEW);
+ _customizedStateType = customizedStateType;
+ _customizedViews = new HashSet<>(customizedViews);
+ refresh(_customizedViews);
+ }
+
+ private void refresh(Collection<CustomizedView> customizedViewList) {
+ Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+ if (customizedViewList != null && !customizedViewList.isEmpty()) {
+ for (InstanceConfig config : _instanceConfigs) {
+ instanceConfigMap.put(config.getId(), config);
+ }
+ for (CustomizedView customizeView : customizedViewList) {
+ String resourceName = customizeView.getId();
+ for (String partitionName : customizeView.getPartitionSet()) {
+ Map<String, String> stateMap = customizeView.getStateMap(partitionName);
+ for (String instanceName : stateMap.keySet()) {
+ String customizedState = stateMap.get(instanceName);
+ if (instanceConfigMap.containsKey(instanceName)) {
+ InstanceConfig instanceConfig = instanceConfigMap.get(instanceName);
+ addEntry(resourceName, partitionName, customizedState, instanceConfig);
+ } else {
+ logger.warn(
+ "Participant {} is not found with proper configuration information. It might already be removed from the cluster. "
+ + "Skip recording partition assignment entry: Partition {}, Participant {}, State {}.",
+ instanceName, partitionName, instanceName, stateMap.get(instanceName));
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns CustomizedView.
+ * @return a collection of CustomizedView
+ */
+ protected Collection<CustomizedView> geCustomizedViews() {
+ return Collections.unmodifiableCollection(_customizedViews);
+ }
+
+ /**
+ * Returns CustomizedStateType
+ * @return the CustomizedStateType of this RoutingTable (Used for CustomizedView)
+ */
+ protected String getStateType() {
+ return _customizedStateType;
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
index bfed4b5..bfae632 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
@@ -19,16 +19,23 @@ package org.apache.helix.spectator;
* under the License.
*/
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
import org.apache.helix.PropertyType;
import org.apache.helix.common.caches.BasicClusterDataCache;
import org.apache.helix.common.caches.CurrentStateCache;
import org.apache.helix.common.caches.CurrentStateSnapshot;
+import org.apache.helix.common.caches.CustomizedViewCache;
import org.apache.helix.common.caches.TargetExternalViewCache;
import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.CustomizedView;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.LiveInstance;
import org.slf4j.Logger;
@@ -40,14 +47,31 @@ import org.slf4j.LoggerFactory;
class RoutingDataCache extends BasicClusterDataCache {
private static Logger LOG = LoggerFactory.getLogger(RoutingDataCache.class.getName());
- private final PropertyType _sourceDataType;
+ private final Map<PropertyType, List<String>> _sourceDataTypeMap;
+
private CurrentStateCache _currentStateCache;
+ // TODO: CustomizedCache needs to be migrated to propertyCache. Once we migrate all cache to
+ // propertyCache, this hardcoded list of fields won't be necessary.
+ private Map<String, CustomizedViewCache> _customizedViewCaches;
private TargetExternalViewCache _targetExternalViewCache;
public RoutingDataCache(String clusterName, PropertyType sourceDataType) {
+ this (clusterName, ImmutableMap.of(sourceDataType, Collections.emptyList()));
+ }
+
+ /**
+ * Initialize empty RoutingDataCache with clusterName, _propertyTypes.
+ * @param clusterName
+ * @param sourceDataTypeMap
+ */
+ public RoutingDataCache(String clusterName, Map<PropertyType, List<String>> sourceDataTypeMap) {
super(clusterName);
- _sourceDataType = sourceDataType;
+ _sourceDataTypeMap = sourceDataTypeMap;
_currentStateCache = new CurrentStateCache(clusterName);
+ _customizedViewCaches = new HashMap<>();
+ sourceDataTypeMap.getOrDefault(PropertyType.CUSTOMIZEDVIEW, Collections.emptyList())
+ .forEach(customizedStateType -> _customizedViewCaches.put(customizedStateType,
+ new CustomizedViewCache(clusterName, customizedStateType)));
_targetExternalViewCache = new TargetExternalViewCache(clusterName);
requireFullRefresh();
}
@@ -65,25 +89,41 @@ class RoutingDataCache extends BasicClusterDataCache {
long startTime = System.currentTimeMillis();
super.refresh(accessor);
-
- if (_sourceDataType.equals(PropertyType.TARGETEXTERNALVIEW) && _propertyDataChangedMap
- .get(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW)) {
- long start = System.currentTimeMillis();
- _propertyDataChangedMap.put(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW, false);
- _targetExternalViewCache.refresh(accessor);
- LOG.info("Reload " + _targetExternalViewCache.getExternalViewMap().keySet().size()
- + " TargetExternalViews. Takes " + (System.currentTimeMillis() - start) + " ms");
- }
-
- if (_sourceDataType.equals(PropertyType.CURRENTSTATES) && _propertyDataChangedMap
- .get(HelixConstants.ChangeType.CURRENT_STATE)) {
+ for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
long start = System.currentTimeMillis();
- _propertyDataChangedMap.put(HelixConstants.ChangeType.CURRENT_STATE, false);
- Map<String, LiveInstance> liveInstanceMap = getLiveInstances();
- _currentStateCache.refresh(accessor, liveInstanceMap);
- LOG.info("Reload CurrentStates. Takes " + (System.currentTimeMillis() - start) + " ms");
+ switch (propertyType) {
+ case TARGETEXTERNALVIEW:
+ if (_propertyDataChangedMap.get(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW)) {
+ _propertyDataChangedMap.put(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW, false);
+ _targetExternalViewCache.refresh(accessor);
+ LOG.info("Reload " + _targetExternalViewCache.getExternalViewMap().keySet().size()
+ + " TargetExternalViews. Takes " + (System.currentTimeMillis() - start) + " ms");
+ }
+ break;
+ case CURRENTSTATES:
+ if (_propertyDataChangedMap.get(HelixConstants.ChangeType.CURRENT_STATE)) {
+ _propertyDataChangedMap.put(HelixConstants.ChangeType.CURRENT_STATE, false);
+ Map<String, LiveInstance> liveInstanceMap = getLiveInstances();
+ _currentStateCache.refresh(accessor, liveInstanceMap);
+ LOG.info("Reload CurrentStates. Takes " + (System.currentTimeMillis() - start) + " ms");
+ }
+ break;
+ case CUSTOMIZEDVIEW: {
+ if (_propertyDataChangedMap.get(HelixConstants.ChangeType.CUSTOMIZED_VIEW)) {
+ for (String customizedStateType : _sourceDataTypeMap.get(PropertyType.CUSTOMIZEDVIEW)) {
+ _customizedViewCaches.get(customizedStateType).refresh(accessor);
+ }
+ LOG.info("Reload CustomizedView for types "
+ + _sourceDataTypeMap.get(PropertyType.CUSTOMIZEDVIEW) + " Takes "
+ + (System.currentTimeMillis() - start) + " ms");
+ }
+ _propertyDataChangedMap.put(HelixConstants.ChangeType.CUSTOMIZED_VIEW, false);
+ }
+ break;
+ default:
+ break;
+ }
}
-
long endTime = System.currentTimeMillis();
LOG.info("END: RoutingDataCache.refresh() for cluster " + _clusterName + ", took " + (endTime
- startTime) + " ms");
@@ -91,6 +131,10 @@ class RoutingDataCache extends BasicClusterDataCache {
if (LOG.isDebugEnabled()) {
LOG.debug("CurrentStates: " + _currentStateCache);
LOG.debug("TargetExternalViews: " + _targetExternalViewCache.getExternalViewMap());
+ for (String customizedStateType : _sourceDataTypeMap.get(PropertyType.CUSTOMIZEDVIEW)) {
+ LOG.debug("CustomizedViews customizedStateType: " + customizedStateType + " "
+ + _customizedViewCaches.get(customizedStateType).getCustomizedViewMap());
+ }
}
}
@@ -104,6 +148,18 @@ class RoutingDataCache extends BasicClusterDataCache {
}
/**
+ * Retrieves the CustomizedView for all resources
+ * @return
+ */
+ public Map<String, CustomizedView> getCustomizedView(String customizedStateType) {
+ if (_customizedViewCaches.containsKey(customizedStateType)) {
+ return _customizedViewCaches.get(customizedStateType).getCustomizedViewMap();
+ }
+ throw new HelixException(String.format(
+ "customizedStateType %s does not exist in customizedViewCaches.", customizedStateType));
+ }
+
+ /**
* Get map of current states in cluster. {InstanceName -> {SessionId -> {ResourceName ->
* CurrentState}}}
*
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
index dd1b623..89ad16f 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
+import org.apache.helix.PropertyType;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
@@ -38,42 +39,64 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A class to consume ExternalViews of a cluster and provide {resource, partition, state} to
- * {instances} map function.
+ * A class to consume ExternalViews or CustomizedViews of a cluster and provide
+ * {resource, partition, state} to {instances} map function.
*/
class RoutingTable {
private static final Logger logger = LoggerFactory.getLogger(RoutingTable.class);
// mapping a resourceName to the ResourceInfo
private final Map<String, ResourceInfo> _resourceInfoMap;
+
// mapping a resource group name to a resourceGroupInfo
private final Map<String, ResourceGroupInfo> _resourceGroupInfoMap;
private final Collection<LiveInstance> _liveInstances;
- private final Collection<InstanceConfig> _instanceConfigs;
+ protected final Collection<InstanceConfig> _instanceConfigs;
private final Collection<ExternalView> _externalViews;
+ private final PropertyType _propertyType;
+
+ @Deprecated
public RoutingTable() {
this(Collections.<ExternalView> emptyList(), Collections.<InstanceConfig> emptyList(),
Collections.<LiveInstance> emptyList());
}
+ /**
+ * Initialize empty RoutingTable and set _propertyType fields.
+ * @param propertyType
+ */
+ protected RoutingTable(PropertyType propertyType) {
+ this(Collections.<ExternalView> emptyList(), Collections.<InstanceConfig> emptyList(),
+ Collections.<LiveInstance> emptyList(), propertyType);
+ }
+
public RoutingTable(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap,
Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
// TODO Aggregate currentState to an ExternalView in the RoutingTable, so there is no need to
// refresh according to the currentStateMap. - jjwang
- this(Collections.<ExternalView> emptyList(), instanceConfigs, liveInstances);
+ this(Collections.<ExternalView> emptyList(),
+ instanceConfigs, liveInstances, PropertyType.CURRENTSTATES);
refresh(currentStateMap);
}
public RoutingTable(Collection<ExternalView> externalViews,
Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
+ this(externalViews, instanceConfigs, liveInstances,
+ PropertyType.EXTERNALVIEW);
+ }
+
+ protected RoutingTable(Collection<ExternalView> externalViews, Collection<InstanceConfig> instanceConfigs,
+ Collection<LiveInstance> liveInstances, PropertyType propertytype) {
+ // TODO Refactor these constructors so we don't have so many constructor.
+ _propertyType = propertytype;
_resourceInfoMap = new HashMap<>();
_resourceGroupInfoMap = new HashMap<>();
_liveInstances = new HashSet<>(liveInstances);
_instanceConfigs = new HashSet<>(instanceConfigs);
_externalViews = new HashSet<>(externalViews);
- refresh(externalViews);
+ refresh(_externalViews);
}
private void refresh(Collection<ExternalView> externalViewList) {
@@ -145,7 +168,7 @@ class RoutingTable {
}
}
- private void addEntry(String resourceName, String partitionName, String state,
+ protected void addEntry(String resourceName, String partitionName, String state,
InstanceConfig config) {
if (!_resourceInfoMap.containsKey(resourceName)) {
_resourceInfoMap.put(resourceName, new ResourceInfo());
@@ -352,6 +375,23 @@ class RoutingTable {
}
/**
+ * Returns PropertyTYpe
+ * @return the PropertyTYpe of this RoutingTable
+ */
+ protected PropertyType getPropertyType() {
+ return _propertyType;
+ }
+
+ /**
+ * Returns CustomizedStateType
+ * @return the CustomizedStateType of this RoutingTable (Used for CustomizedView)
+ */
+ protected String getStateType() {
+ return RoutingTableProvider.DEFAULT_STATE_TYPE;
+ }
+
+
+ /**
* Class to store instances, partitions and their states for each resource.
*/
class ResourceInfo {
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 1a8f641..1e11965 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.collect.ImmutableMap;
import javax.management.JMException;
import org.apache.helix.HelixConstants;
@@ -44,6 +45,7 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyType;
import org.apache.helix.api.listeners.ConfigChangeListener;
import org.apache.helix.api.listeners.CurrentStateChangeListener;
+import org.apache.helix.api.listeners.CustomizedViewChangeListener;
import org.apache.helix.api.listeners.ExternalViewChangeListener;
import org.apache.helix.api.listeners.InstanceConfigChangeListener;
import org.apache.helix.api.listeners.LiveInstanceChangeListener;
@@ -55,6 +57,7 @@ import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.CustomizedView;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
@@ -64,15 +67,15 @@ import org.slf4j.LoggerFactory;
public class RoutingTableProvider
implements ExternalViewChangeListener, InstanceConfigChangeListener, ConfigChangeListener,
- LiveInstanceChangeListener, CurrentStateChangeListener {
+ LiveInstanceChangeListener, CurrentStateChangeListener, CustomizedViewChangeListener {
private static final Logger logger = LoggerFactory.getLogger(RoutingTableProvider.class);
private static final long DEFAULT_PERIODIC_REFRESH_INTERVAL = 300000L; // 5 minutes
- private final AtomicReference<RoutingTable> _routingTableRef;
+ private final Map<String, AtomicReference<RoutingTable>> _routingTableRefMap;
private final HelixManager _helixManager;
private final RouterUpdater _routerUpdater;
- private final PropertyType _sourceDataType;
+ private final Map<PropertyType, List<String>> _sourceDataTypeMap;
private final Map<RoutingTableChangeListener, ListenerContext> _routingTableChangeListenerMap;
- private final RoutingTableProviderMonitor _monitor;
+ private final Map<PropertyType, RoutingTableProviderMonitor> _monitorMap;
// For periodic refresh
private long _lastRefreshTimestamp;
@@ -83,17 +86,27 @@ public class RoutingTableProvider
private ExecutorService _reportExecutor;
private Future _reportingTask = null;
+ protected static final String DEFAULT_PROPERTY_TYPE = "HELIX_DEFAULT_PROPERTY";
+ protected static final String DEFAULT_STATE_TYPE = "HELIX_DEFAULT";
+
+
public RoutingTableProvider() {
this(null);
}
public RoutingTableProvider(HelixManager helixManager) throws HelixException {
- this(helixManager, PropertyType.EXTERNALVIEW, true, DEFAULT_PERIODIC_REFRESH_INTERVAL);
+ this(helixManager, ImmutableMap.of(PropertyType.EXTERNALVIEW, Collections.emptyList()), true,
+ DEFAULT_PERIODIC_REFRESH_INTERVAL);
}
public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataType)
throws HelixException {
- this(helixManager, sourceDataType, true, DEFAULT_PERIODIC_REFRESH_INTERVAL);
+ this(helixManager, ImmutableMap.of(sourceDataType, Collections.emptyList()), true,
+ DEFAULT_PERIODIC_REFRESH_INTERVAL);
+ }
+
+ public RoutingTableProvider(HelixManager helixManager, Map<PropertyType, List<String>> sourceDataTypeMap) {
+ this(helixManager, sourceDataTypeMap, true, DEFAULT_PERIODIC_REFRESH_INTERVAL);
}
/**
@@ -106,73 +119,75 @@ public class RoutingTableProvider
*/
public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataType,
boolean isPeriodicRefreshEnabled, long periodRefreshInterval) throws HelixException {
- _routingTableRef = new AtomicReference<>(new RoutingTable());
+ this(helixManager, ImmutableMap.of(sourceDataType, Collections.emptyList()),
+ isPeriodicRefreshEnabled, periodRefreshInterval);
+ }
+
+ /**
+ * Initialize an instance of RoutingTableProvider
+ * @param helixManager
+ * @param sourceDataTypeMap
+ * @param isPeriodicRefreshEnabled true if periodic refresh is enabled, false otherwise
+ * @param periodRefreshInterval only effective if isPeriodRefreshEnabled is true
+ * @throws HelixException
+ */
+ public RoutingTableProvider(HelixManager helixManager,
+ Map<PropertyType, List<String>> sourceDataTypeMap, boolean isPeriodicRefreshEnabled,
+ long periodRefreshInterval) throws HelixException {
+
+ validateSourceDataTypeMap(sourceDataTypeMap);
+
+ _routingTableRefMap = new HashMap<>();
_helixManager = helixManager;
- _sourceDataType = sourceDataType;
+ _sourceDataTypeMap = sourceDataTypeMap;
_routingTableChangeListenerMap = new ConcurrentHashMap<>();
String clusterName = _helixManager != null ? _helixManager.getClusterName() : null;
- _monitor = new RoutingTableProviderMonitor(_sourceDataType, clusterName);
- try {
- _monitor.register();
- } catch (JMException e) {
- logger.error("Failed to register RoutingTableProvider monitor MBean.", e);
- }
- _reportExecutor = Executors.newSingleThreadExecutor();
-
- _routerUpdater = new RouterUpdater(clusterName, _sourceDataType);
- _routerUpdater.start();
-
- if (_helixManager != null) {
- switch (_sourceDataType) {
- case EXTERNALVIEW:
- try {
- _helixManager.addExternalViewChangeListener(this);
- } catch (Exception e) {
- shutdown();
- logger.error("Failed to attach ExternalView Listener to HelixManager!");
- throw new HelixException("Failed to attach ExternalView Listener to HelixManager!", e);
+ // Initialize the tables
+ for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
+ if (_sourceDataTypeMap.get(propertyType).size() == 0) {
+ if (propertyType.equals(PropertyType.CUSTOMIZEDVIEW)) {
+ throw new HelixException("CustomizedView has been used without any aggregation type!");
}
- break;
-
- case TARGETEXTERNALVIEW:
- // Check whether target external has been enabled or not
- if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists(
- _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(), 0)) {
- shutdown();
- throw new HelixException("Target External View is not enabled!");
+ String key = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE);
+ if (_routingTableRefMap.get(key) == null) {
+ _routingTableRefMap.put(key, new AtomicReference<>(new RoutingTable(propertyType)));
}
-
- try {
- _helixManager.addTargetExternalViewChangeListener(this);
- } catch (Exception e) {
- shutdown();
- logger.error("Failed to attach TargetExternalView Listener to HelixManager!");
- throw new HelixException("Failed to attach TargetExternalView Listener to HelixManager!",
- e);
+ } else {
+ if (!propertyType.equals(PropertyType.CUSTOMIZEDVIEW)) {
+ throw new HelixException(
+ String.format("Type %s has been used in addition to the propertyType %s !",
+ sourceDataTypeMap.get(propertyType), propertyType.name()));
+ }
+ for (String customizedStateType : _sourceDataTypeMap.get(propertyType)) {
+ String key = generateReferenceKey(propertyType.name(), customizedStateType);
+ if (_routingTableRefMap.get(key) == null) {
+ _routingTableRefMap.put(key, new AtomicReference<>(
+ new CustomizedViewRoutingTable(propertyType, customizedStateType)));
+ }
}
- break;
-
- case CURRENTSTATES:
- // CurrentState change listeners will be added later in LiveInstanceChange call.
- break;
-
- default:
- throw new HelixException(String.format("Unsupported source data type: %s", sourceDataType));
}
+ }
+
+ // Start Monitoring
+ _monitorMap = new HashMap<>();
+ for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
+ _monitorMap.put(propertyType, new RoutingTableProviderMonitor(propertyType, clusterName));
try {
- _helixManager.addInstanceConfigChangeListener(this);
- _helixManager.addLiveInstanceChangeListener(this);
- } catch (Exception e) {
- shutdown();
- logger.error(
- "Failed to attach InstanceConfig and LiveInstance Change listeners to HelixManager!");
- throw new HelixException(
- "Failed to attach InstanceConfig and LiveInstance Change listeners to HelixManager!",
- e);
+ _monitorMap.get(propertyType).register();
+ } catch (JMException e) {
+ logger.error("Failed to register RoutingTableProvider monitor MBean.", e);
}
}
+ _reportExecutor = Executors.newSingleThreadExecutor();
+
+ // Start Updaters
+ _routerUpdater = new RouterUpdater(clusterName, sourceDataTypeMap);
+ _routerUpdater.start();
+
+ // Add listeners
+ addListeners();
// For periodic refresh
if (isPeriodicRefreshEnabled && _helixManager != null) {
@@ -200,6 +215,92 @@ public class RoutingTableProvider
}
/**
+ * A method that adds the ChangeListeners to HelixManager
+ */
+ private void addListeners() {
+ if (_helixManager != null) {
+ for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
+ switch (propertyType) {
+ case EXTERNALVIEW:
+ try {
+ _helixManager.addExternalViewChangeListener(this);
+ } catch (Exception e) {
+ shutdown();
+ throw new HelixException("Failed to attach ExternalView Listener to HelixManager!", e);
+ }
+ break;
+ case CUSTOMIZEDVIEW:
+ List<String> customizedStateTypes = _sourceDataTypeMap.get(propertyType);
+ for (String customizedStateType : customizedStateTypes) {
+ try {
+ _helixManager.addCustomizedViewChangeListener(this, customizedStateType);
+ } catch (Exception e) {
+ shutdown();
+ throw new HelixException(String.format(
+ "Failed to attach CustomizedView Listener to HelixManager for type %s!",
+ customizedStateType), e);
+ }
+ }
+ break;
+ case TARGETEXTERNALVIEW:
+ // Check whether target external has been enabled or not
+ if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists(
+ _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(),
+ 0)) {
+ shutdown();
+ throw new HelixException("Target External View is not enabled!");
+ }
+
+ try {
+ _helixManager.addTargetExternalViewChangeListener(this);
+ } catch (Exception e) {
+ shutdown();
+ throw new HelixException(
+ "Failed to attach TargetExternalView Listener to HelixManager!", e);
+ }
+ break;
+ case CURRENTSTATES:
+ // CurrentState change listeners will be added later in LiveInstanceChange call.
+ break;
+ default:
+ throw new HelixException(String.format("Unsupported source data type: %s", propertyType));
+ }
+ }
+ try {
+ _helixManager.addInstanceConfigChangeListener(this);
+ _helixManager.addLiveInstanceChangeListener(this);
+ } catch (Exception e) {
+ shutdown();
+ throw new HelixException(
+ "Failed to attach InstanceConfig and LiveInstance Change listeners to HelixManager!",
+ e);
+ }
+ }
+ }
+
+ /**
+ * Check and validate the input of the sourceDataTypeMap parameter
+ * @param sourceDataTypeMap
+ */
+ private void validateSourceDataTypeMap(Map<PropertyType, List<String>> sourceDataTypeMap) {
+ for (PropertyType propertyType : sourceDataTypeMap.keySet()) {
+ if (propertyType.equals(PropertyType.CUSTOMIZEDVIEW)
+ && sourceDataTypeMap.get(propertyType).size() == 0) {
+ logger.error("CustomizedView has been used without any aggregation type!");
+ throw new HelixException("CustomizedView has been used without any aggregation type!");
+ }
+ if (!propertyType.equals(PropertyType.CUSTOMIZEDVIEW)
+ && sourceDataTypeMap.get(propertyType).size() != 0) {
+ logger.error("Type has been used in addition to the propertyType {} !",
+ propertyType.name());
+ throw new HelixException(
+ String.format("Type %s has been used in addition to the propertyType %s !",
+ sourceDataTypeMap.get(propertyType), propertyType.name()));
+ }
+ }
+ }
+
+ /**
* Shutdown current RoutingTableProvider. Once it is shutdown, it should never be reused.
*/
public void shutdown() {
@@ -209,24 +310,36 @@ public class RoutingTableProvider
}
_routerUpdater.shutdown();
- _monitor.unregister();
+
+ for (PropertyType propertyType : _monitorMap.keySet()) {
+ _monitorMap.get(propertyType).unregister();
+ }
if (_helixManager != null) {
PropertyKey.Builder keyBuilder = _helixManager.getHelixDataAccessor().keyBuilder();
- switch (_sourceDataType) {
- case EXTERNALVIEW:
- _helixManager.removeListener(keyBuilder.externalViews(), this);
- break;
- case TARGETEXTERNALVIEW:
- _helixManager.removeListener(keyBuilder.targetExternalViews(), this);
- break;
- case CURRENTSTATES:
- NotificationContext context = new NotificationContext(_helixManager);
- context.setType(NotificationContext.Type.FINALIZE);
- updateCurrentStatesListeners(Collections.<LiveInstance> emptyList(), context);
- break;
- default:
- break;
+ for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
+ switch (propertyType) {
+ case EXTERNALVIEW:
+ _helixManager.removeListener(keyBuilder.externalViews(), this);
+ break;
+ case CUSTOMIZEDVIEW:
+ List<String> customizedStateTypes = _sourceDataTypeMap.get(propertyType);
+ // Remove listener on each individual customizedStateType
+ for (String customizedStateType : customizedStateTypes) {
+ _helixManager.removeListener(keyBuilder.customizedView(customizedStateType), this);
+ }
+ break;
+ case TARGETEXTERNALVIEW:
+ _helixManager.removeListener(keyBuilder.targetExternalViews(), this);
+ break;
+ case CURRENTSTATES:
+ NotificationContext context = new NotificationContext(_helixManager);
+ context.setType(NotificationContext.Type.FINALIZE);
+ updateCurrentStatesListeners(Collections.<LiveInstance> emptyList(), context);
+ break;
+ default:
+ break;
+ }
}
}
}
@@ -237,7 +350,46 @@ public class RoutingTableProvider
* @return snapshot of current routing table.
*/
public RoutingTableSnapshot getRoutingTableSnapshot() {
- return new RoutingTableSnapshot(_routingTableRef.get());
+ return new RoutingTableSnapshot(getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE));
+ }
+
+ /**
+ * Get an snapshot of current RoutingTable information for specific PropertyType.
+ * The snapshot is immutable, it reflects the routing table information at the time this method is
+ * called.
+ * @return snapshot of current routing table.
+ */
+ public RoutingTableSnapshot getRoutingTableSnapshot(PropertyType propertyType) {
+ return new RoutingTableSnapshot(getRoutingTableRef(propertyType.name(), DEFAULT_STATE_TYPE));
+ }
+
+ /**
+ * Get an snapshot of all of the available RoutingTable information. The snapshot is immutable, it
+ * reflects the routing table information at the time this method is called.
+ * @return snapshot associated with specific propertyType and type.
+ */
+ public RoutingTableSnapshot getRoutingTableSnapshot(PropertyType propertyType, String stateType) {
+ return new RoutingTableSnapshot(getRoutingTableRef(propertyType.name(), stateType));
+ }
+
+ /**
+ * Get an snapshot of all of the available RoutingTable information. The snapshot is immutable, it
+ * reflects the routing table information at the time this method is called.
+ * @return all of the available snapshots of current routing table.
+ */
+ public Map<String, Map<String, RoutingTableSnapshot>> getRoutingTableSnapshots() {
+ Map<String, Map<String, RoutingTableSnapshot>> snapshots = new HashMap<>();
+ for (String key : _routingTableRefMap.keySet()) {
+ RoutingTable routingTable = _routingTableRefMap.get(key).get();
+ String propertyTypeName = routingTable.getPropertyType().name();
+ String customizedStateType = routingTable.getStateType();
+ if (!snapshots.containsKey(propertyTypeName)) {
+ snapshots.put(propertyTypeName, new HashMap<>());
+ }
+ snapshots.get(propertyTypeName).put(customizedStateType,
+ new RoutingTableSnapshot(routingTable));
+ }
+ return snapshots;
}
/**
@@ -264,12 +416,10 @@ public class RoutingTableProvider
}
/**
- * returns the instances for {resource,partition} pair that are in a specific
- * {state}
+ * returns the instances for {resource,partition} pair that are in a specific {state}.
* This method will be deprecated, please use the
* {@link #getInstancesForResource(String, String, String)} getInstancesForResource} method.
* @param resourceName
- * -
* @param partitionName
* @param state
* @return empty list if there is no instance in a given state
@@ -280,17 +430,16 @@ public class RoutingTableProvider
}
/**
- * returns the instances for {resource,partition} pair that are in a specific
- * {state}
+ * returns the instances for {resource,partition} pair that are in a specific {state}
* @param resourceName
- * -
* @param partitionName
* @param state
* @return empty list if there is no instance in a given state
*/
public List<InstanceConfig> getInstancesForResource(String resourceName, String partitionName,
String state) {
- return _routingTableRef.get().getInstancesForResource(resourceName, partitionName, state);
+ return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE)
+ .getInstancesForResource(resourceName, partitionName, state);
}
/**
@@ -305,8 +454,8 @@ public class RoutingTableProvider
*/
public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
String partitionName, String state) {
- return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, partitionName,
- state);
+ return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE)
+ .getInstancesForResourceGroup(resourceGroupName, partitionName, state);
}
/**
@@ -322,11 +471,12 @@ public class RoutingTableProvider
*/
public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
String partitionName, String state, List<String> resourceTags) {
- return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, partitionName,
- state, resourceTags);
+ return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE)
+ .getInstancesForResourceGroup(resourceGroupName, partitionName, state, resourceTags);
}
/**
+ * For specific routing table associated with {propertyType, stateType}
* returns all instances for {resource} that are in a specific {state}
* This method will be deprecated, please use the
* {@link #getInstancesForResource(String, String) getInstancesForResource} method.
@@ -345,7 +495,8 @@ public class RoutingTableProvider
* @return empty list if there is no instance in a given state
*/
public Set<InstanceConfig> getInstancesForResource(String resourceName, String state) {
- return _routingTableRef.get().getInstancesForResource(resourceName, state);
+ return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE)
+ .getInstancesForResource(resourceName, state);
}
/**
@@ -355,7 +506,8 @@ public class RoutingTableProvider
* @return empty list if there is no instance in a given state
*/
public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state) {
- return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, state);
+ return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE)
+ .getInstancesForResourceGroup(resourceGroupName, state);
}
/**
@@ -367,8 +519,8 @@ public class RoutingTableProvider
*/
public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state,
List<String> resourceTags) {
- return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, state,
- resourceTags);
+ return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE)
+ .getInstancesForResourceGroup(resourceGroupName, state, resourceTags);
}
/**
@@ -376,7 +528,11 @@ public class RoutingTableProvider
* @return
*/
public Collection<LiveInstance> getLiveInstances() {
- return _routingTableRef.get().getLiveInstances();
+ // Since line instances will be the same across all _routingTableRefMap, here one of the keys
+ // will be used without considering PropertyType
+ // TODO each table will keep a separate instance list.This can be improve by only keeping one
+ // copy of the data
+ return _routingTableRefMap.values().iterator().next().get().getLiveInstances();
}
/**
@@ -384,14 +540,58 @@ public class RoutingTableProvider
* @return
*/
public Collection<InstanceConfig> getInstanceConfigs() {
- return _routingTableRef.get().getInstanceConfigs();
+ // Since line instances will be the same across all _routingTableRefMap, here one of the keys
+ // will be used without considering PropertyType
+ // TODO each table will keep a separate instance list.This can be improve by only keeping one copy of the data
+ return _routingTableRefMap.values().iterator().next().get().getInstanceConfigs();
}
/**
- * Return names of all resources (shown in ExternalView) in this cluster.
+ * Return names of all resources (shown in ExternalView or CustomizedView) in this cluster.
*/
public Collection<String> getResources() {
- return _routingTableRef.get().getResources();
+ return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE).getResources();
+ }
+
+ /**
+ * Provide the key associated with specific PropertyType and StateType for _routingTableRefMap lookup.
+ * @param propertyTypeName
+ * @param stateType
+ * @return
+ */
+ private RoutingTable getRoutingTableRef(String propertyTypeName, String stateType) {
+ if (propertyTypeName.equals(DEFAULT_PROPERTY_TYPE)) {
+ // Check whether there exist only one snapshot (_routingTableRefMap)
+ if (_routingTableRefMap.keySet().size() == 1) {
+ String key = _routingTableRefMap.keySet().iterator().next();
+ if (!_routingTableRefMap.containsKey(key)) {
+ throw new HelixException(
+ String.format("Currently there is no snapshot available for PropertyType %s and stateType %s",
+ propertyTypeName, stateType));
+ }
+ return _routingTableRefMap.get(key).get();
+ } else {
+ throw new HelixException("There is none or more than one RoutingTableSnapshot");
+ }
+ }
+
+ if (stateType.equals(DEFAULT_STATE_TYPE)) {
+ if (propertyTypeName.equals(PropertyType.CUSTOMIZEDVIEW.name())) {
+ throw new HelixException("Specific type needs to be used for CUSTOMIZEDVIEW PropertyType");
+ }
+ }
+
+ String key = generateReferenceKey(propertyTypeName, stateType);
+ if (!_routingTableRefMap.containsKey(key)) {
+ throw new HelixException(
+ String.format("Currently there is no snapshot available for PropertyType %s and stateType %s",
+ propertyTypeName, stateType));
+ }
+ return _routingTableRefMap.get(key).get();
+ }
+
+ private String generateReferenceKey(String propertyType, String stateType) {
+ return propertyType + "_" + stateType;
}
@Override
@@ -399,27 +599,32 @@ public class RoutingTableProvider
public void onExternalViewChange(List<ExternalView> externalViewList,
NotificationContext changeContext) {
HelixConstants.ChangeType changeType = changeContext.getChangeType();
- if (changeType != null && !changeType.getPropertyType().equals(_sourceDataType)) {
+ if (changeType != null && !_sourceDataTypeMap.containsKey(changeType.getPropertyType())) {
logger.warn(
- "onExternalViewChange called with mismatched change types. Source data type {}, changed data type: {}",
- _sourceDataType, changeType);
+ "onExternalViewChange called with mismatched change types. Source data types does not contain changed data type: {}",
+ changeType);
return;
}
// Refresh with full list of external view.
if (externalViewList != null && externalViewList.size() > 0) {
// keep this here for back-compatibility, application can call onExternalViewChange directly
// with externalview list supplied.
- refresh(externalViewList, changeContext);
+ String keyReference = generateReferenceKey(PropertyType.EXTERNALVIEW.name(), DEFAULT_STATE_TYPE);
+ HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ List<InstanceConfig> configList = accessor.getChildValues(keyBuilder.instanceConfigs());
+ List<LiveInstance> liveInstances = accessor.getChildValues(keyBuilder.liveInstances());
+ refreshExternalView(externalViewList, configList, liveInstances, keyReference);
} else {
ClusterEventType eventType;
- if (_sourceDataType.equals(PropertyType.EXTERNALVIEW)) {
+ if (_sourceDataTypeMap.containsKey(PropertyType.EXTERNALVIEW)) {
eventType = ClusterEventType.ExternalViewChange;
- } else if (_sourceDataType.equals(PropertyType.TARGETEXTERNALVIEW)) {
+ } else if (_sourceDataTypeMap.containsKey(PropertyType.TARGETEXTERNALVIEW)) {
eventType = ClusterEventType.TargetExternalViewChange;
} else {
logger.warn(
- "onExternalViewChange called with mismatched change types. Source data type {}, change type: {}",
- _sourceDataType, changeType);
+ "onExternalViewChange called with mismatched change types. Source data types does not contain changed data type: {}",
+ changeType);
return;
}
_routerUpdater.queueEvent(changeContext, eventType, changeType);
@@ -444,11 +649,10 @@ public class RoutingTableProvider
@PreFetch(enabled = true)
public void onLiveInstanceChange(List<LiveInstance> liveInstances,
NotificationContext changeContext) {
- if (_sourceDataType.equals(PropertyType.CURRENTSTATES)) {
+ if (_sourceDataTypeMap.containsKey(PropertyType.CURRENTSTATES)) {
// Go though the live instance list and update CurrentState listeners
updateCurrentStatesListeners(liveInstances, changeContext);
}
-
_routerUpdater.queueEvent(changeContext, ClusterEventType.LiveInstanceChange,
HelixConstants.ChangeType.LIVE_INSTANCE);
}
@@ -457,7 +661,7 @@ public class RoutingTableProvider
@PreFetch(enabled = false)
public void onStateChange(String instanceName, List<CurrentState> statesInfo,
NotificationContext changeContext) {
- if (_sourceDataType.equals(PropertyType.CURRENTSTATES)) {
+ if (_sourceDataTypeMap.containsKey(PropertyType.CURRENTSTATES)) {
_routerUpdater.queueEvent(changeContext, ClusterEventType.CurrentStateChange,
HelixConstants.ChangeType.CURRENT_STATE);
} else {
@@ -466,6 +670,19 @@ public class RoutingTableProvider
}
}
+ @Override
+ @PreFetch(enabled = false)
+ public void onCustomizedViewChange(List<CustomizedView> customizedViewList,
+ NotificationContext changeContext) {
+ if (_sourceDataTypeMap.containsKey(PropertyType.CUSTOMIZEDVIEW)) {
+ _routerUpdater.queueEvent(changeContext, ClusterEventType.CustomizedViewChange,
+ HelixConstants.ChangeType.CUSTOMIZED_VIEW);
+ } else {
+ logger.warn(
+ "RoutingTableProvider does not use CurrentStates as source, ignore CurrentState changes!");
+ }
+ }
+
final AtomicReference<Map<String, LiveInstance>> _lastSeenSessions = new AtomicReference<>();
/**
@@ -529,43 +746,58 @@ public class RoutingTableProvider
private void reset() {
logger.info("Resetting the routing table.");
- RoutingTable newRoutingTable = new RoutingTable();
- _routingTableRef.set(newRoutingTable);
+ RoutingTable newRoutingTable;
+ for (String key: _routingTableRefMap.keySet()) {
+ PropertyType propertyType = _routingTableRefMap.get(key).get().getPropertyType();
+ if (propertyType == PropertyType.CUSTOMIZEDVIEW) {
+ String stateType = _routingTableRefMap.get(key).get().getStateType();
+ newRoutingTable = new CustomizedViewRoutingTable(propertyType, stateType);
+ } else {
+ newRoutingTable = new RoutingTable(propertyType);
+ }
+ _routingTableRefMap.get(key).set(newRoutingTable);
+ }
}
- protected void refresh(List<ExternalView> externalViewList, NotificationContext changeContext) {
- HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-
- List<InstanceConfig> configList = accessor.getChildValues(keyBuilder.instanceConfigs());
- List<LiveInstance> liveInstances = accessor.getChildValues(keyBuilder.liveInstances());
- refresh(externalViewList, configList, liveInstances);
+ protected void refreshExternalView(Collection<ExternalView> externalViews,
+ Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances,
+ String referenceKey) {
+ long startTime = System.currentTimeMillis();
+ PropertyType propertyType = _routingTableRefMap.get(referenceKey).get().getPropertyType();
+ RoutingTable newRoutingTable =
+ new RoutingTable(externalViews, instanceConfigs, liveInstances, propertyType);
+ resetRoutingTableAndNotify(startTime, newRoutingTable, referenceKey);
}
- protected void refresh(Collection<ExternalView> externalViews,
- Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
+ protected void refreshCustomizedView(Collection<CustomizedView> customizedViews,
+ Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances,
+ String referenceKey) {
long startTime = System.currentTimeMillis();
- RoutingTable newRoutingTable = new RoutingTable(externalViews, instanceConfigs, liveInstances);
- resetRoutingTableAndNotify(startTime, newRoutingTable);
+ PropertyType propertyType = _routingTableRefMap.get(referenceKey).get().getPropertyType();
+ String customizedStateType = _routingTableRefMap.get(referenceKey).get().getStateType();
+ RoutingTable newRoutingTable = new CustomizedViewRoutingTable(customizedViews, instanceConfigs,
+ liveInstances, propertyType, customizedStateType);
+ resetRoutingTableAndNotify(startTime, newRoutingTable, referenceKey);
}
- protected void refresh(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap,
- Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
+ protected void refreshCurrentState(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap,
+ Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances,
+ String referenceKey) {
long startTime = System.currentTimeMillis();
RoutingTable newRoutingTable =
new RoutingTable(currentStateMap, instanceConfigs, liveInstances);
- resetRoutingTableAndNotify(startTime, newRoutingTable);
+ resetRoutingTableAndNotify(startTime, newRoutingTable, referenceKey);
}
- private void resetRoutingTableAndNotify(long startTime, RoutingTable newRoutingTable) {
- _routingTableRef.set(newRoutingTable);
+ private void resetRoutingTableAndNotify(long startTime, RoutingTable newRoutingTable, String referenceKey) {
+ _routingTableRefMap.get(referenceKey).set(newRoutingTable);
String clusterName = _helixManager != null ? _helixManager.getClusterName() : null;
logger.info("Refreshed the RoutingTable for cluster {}, took {} ms.", clusterName,
(System.currentTimeMillis() - startTime));
// TODO: move the callback user code logic to separate thread upon routing table statePropagation latency
// integration test result. If the latency is more than 2 secs, we need to change this part.
- notifyRoutingTableChange(clusterName);
+ notifyRoutingTableChange(clusterName, referenceKey);
// Update timestamp for last refresh
if (_isPeriodicRefreshEnabled) {
@@ -573,13 +805,16 @@ public class RoutingTableProvider
}
}
- private void notifyRoutingTableChange(String clusterName) {
- // This call back is called in the main event queue of RoutingTableProvider. We add log to record time spent
+ private void notifyRoutingTableChange(String clusterName, String referenceKey) {
+ // This call back is called in the main event queue of RoutingTableProvider. We add log to
+ // record time spent
// here. Potentially, we should call this callback in a separate thread if this is a bottleneck.
long startTime = System.currentTimeMillis();
- for (Map.Entry<RoutingTableChangeListener, ListenerContext> entry : _routingTableChangeListenerMap.entrySet()) {
- entry.getKey()
- .onRoutingTableChange(new RoutingTableSnapshot(_routingTableRef.get()), entry.getValue().getContext());
+ for (Map.Entry<RoutingTableChangeListener, ListenerContext> entry : _routingTableChangeListenerMap
+ .entrySet()) {
+ entry.getKey().onRoutingTableChange(
+ new RoutingTableSnapshot(_routingTableRefMap.get(referenceKey).get()),
+ entry.getValue().getContext());
}
logger.info("RoutingTableProvider user callback time for cluster {}, took {} ms.", clusterName,
(System.currentTimeMillis() - startTime));
@@ -587,10 +822,12 @@ public class RoutingTableProvider
private class RouterUpdater extends ClusterEventProcessor {
private final RoutingDataCache _dataCache;
+ private final Map<PropertyType, List<String>> _sourceDataTypeMap;
- public RouterUpdater(String clusterName, PropertyType sourceDataType) {
+ public RouterUpdater(String clusterName, Map<PropertyType, List<String>> sourceDataTypeMap) {
super(clusterName, "Helix-RouterUpdater-event_process");
- _dataCache = new RoutingDataCache(clusterName, sourceDataType);
+ _sourceDataTypeMap = sourceDataTypeMap;
+ _dataCache = new RoutingDataCache(clusterName, _sourceDataTypeMap);
}
@Override
@@ -618,36 +855,49 @@ public class RoutingTableProvider
throw new HelixException("HelixManager is null for router update event.");
}
if (!manager.isConnected()) {
- logger.error(
- String.format("HelixManager is not connected for router update event: %s", event));
+ logger.error(String.format("HelixManager is not connected for router update event: %s", event));
throw new HelixException("HelixManager is not connected for router update event.");
}
long startTime = System.currentTimeMillis();
_dataCache.refresh(manager.getHelixDataAccessor());
- switch (_sourceDataType) {
- case EXTERNALVIEW:
- refresh(_dataCache.getExternalViews().values(),
- _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values());
- break;
- case TARGETEXTERNALVIEW:
- refresh(_dataCache.getTargetExternalViews().values(),
- _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values());
- break;
- case CURRENTSTATES:
- refresh(_dataCache.getCurrentStatesMap(), _dataCache.getInstanceConfigMap().values(),
- _dataCache.getLiveInstances().values());
+ for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
+ switch (propertyType) {
+ case EXTERNALVIEW: {
+ String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE);
+ refreshExternalView(_dataCache.getExternalViews().values(),
+ _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values(),
+ keyReference);
+ }
+ break;
+ case TARGETEXTERNALVIEW: {
+ String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE);
+ refreshExternalView(_dataCache.getTargetExternalViews().values(),
+ _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values(),
+ keyReference);
+ }
+ break;
+ case CUSTOMIZEDVIEW:
+ for (String customizedStateType : _sourceDataTypeMap.getOrDefault(PropertyType.CUSTOMIZEDVIEW, Collections.emptyList())) {
+ String keyReference = generateReferenceKey(propertyType.name(), customizedStateType);
+ refreshCustomizedView(_dataCache.getCustomizedView(customizedStateType).values(),
+ _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values(), keyReference);
+ }
+ break;
+ case CURRENTSTATES: {
+ String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE);;
+ refreshCurrentState(_dataCache.getCurrentStatesMap(), _dataCache.getInstanceConfigMap().values(),
+ _dataCache.getLiveInstances().values(), keyReference);
+ recordPropagationLatency(System.currentTimeMillis(), _dataCache.getCurrentStateSnapshot());
+ }
+ break;
+ default:
+ logger.warn("Unsupported source data type: {}, stop refreshing the routing table!", propertyType);
+ }
- recordPropagationLatency(System.currentTimeMillis(),
- _dataCache.getCurrentStateSnapshot());
- break;
- default:
- logger.warn("Unsupported source data type: {}, stop refreshing the routing table!",
- _sourceDataType);
+ _monitorMap.get(propertyType).increaseDataRefreshCounters(startTime);
}
-
- _monitor.increaseDataRefreshCounters(startTime);
}
}
@@ -673,10 +923,12 @@ public class RoutingTableProvider
for (String partition : partitionStateEndTimes.keySet()) {
long endTime = partitionStateEndTimes.get(partition);
if (currentTime >= endTime) {
- _monitor.recordStatePropagationLatency(currentTime - endTime);
- logger.debug(
- "CurrentState updated in the routing table. Node Key {}, Partition {}, end time {}, Propagation latency {}",
- key.toString(), partition, endTime, currentTime - endTime);
+ for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
+ _monitorMap.get(propertyType).recordStatePropagationLatency(currentTime - endTime);
+ logger.debug(
+ "CurrentState updated in the routing table. Node Key {}, Partition {}, end time {}, Propagation latency {}",
+ key.toString(), partition, endTime, currentTime - endTime);
+ }
} else {
// Verbose log in case currentTime < endTime. This could be the case that Router
// clock is slower than the participant clock.
@@ -692,6 +944,7 @@ public class RoutingTableProvider
}
}
+
public void queueEvent(NotificationContext context, ClusterEventType eventType,
HelixConstants.ChangeType changeType) {
ClusterEvent event = new ClusterEvent(_clusterName, eventType);
@@ -700,12 +953,17 @@ public class RoutingTableProvider
event.addAttribute(AttributeName.helixmanager.name(), context.getManager());
event.addAttribute(AttributeName.changeContext.name(), context);
queueEvent(event);
-
- _monitor.increaseCallbackCounters(_eventQueue.size());
+ // TODO: Split the monitor into. One is general router callback tracking. The other one is for
+ // each type of tracking.
+ // TODO: We may need to add more complexity to the customized view monitor for each state
+ // type.
+ for (PropertyType propertyType : _monitorMap.keySet()) {
+ _monitorMap.get(propertyType).increaseCallbackCounters(_eventQueue.size());
+ }
}
}
- private class ListenerContext {
+ protected class ListenerContext {
private Object _context;
public ListenerContext(Object context) {
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java
index abfc87b..e7cc6bb 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java
@@ -20,9 +20,12 @@ package org.apache.helix.spectator;
*/
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
+import org.apache.helix.PropertyType;
+import org.apache.helix.model.CustomizedView;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
@@ -33,9 +36,13 @@ import org.apache.helix.model.LiveInstance;
*/
public class RoutingTableSnapshot {
private final RoutingTable _routingTable;
+ private final PropertyType _propertyType;
+ private final String _stateType;
public RoutingTableSnapshot(RoutingTable routingTable) {
_routingTable = routingTable;
+ _propertyType = routingTable.getPropertyType();
+ _stateType = routingTable.getStateType();
}
/**
@@ -145,4 +152,34 @@ public class RoutingTableSnapshot {
public Collection<ExternalView> getExternalViews() {
return _routingTable.getExternalViews();
}
+
+ /**
+ * Returns a Collection of latest snapshot of CustomizedView. Note that if the RoutingTable is
+ * instantiated using CurrentStates, this Collection will be empty.
+ * @return
+ */
+ public Collection<CustomizedView> getCustomizeViews() {
+ if (_propertyType.equals(PropertyType.CUSTOMIZEDVIEW)){
+ CustomizedViewRoutingTable customizedViewRoutingTable =
+ (CustomizedViewRoutingTable) _routingTable;
+ return customizedViewRoutingTable.geCustomizedViews();
+ }
+ return Collections.emptySet();
+ }
+
+ /**
+ * Returns the PropertyType associated with this RoutingTableSnapshot
+ * @return
+ */
+ public PropertyType getPropertyType() {
+ return _propertyType;
+ }
+
+ /**
+ * Return the Type associated with the RoutingTableSnapshot (mainly used for CustomizedView)
+ * @return
+ */
+ public String getCustomizedStateType() {
+ return _stateType;
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
index 7cda4a0..af816a1 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
@@ -147,7 +147,7 @@ public class DummyClusterManager implements HelixManager {
}
@Override
- public void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String aggregationType) throws Exception {
+ public void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String customizedStateType) throws Exception {
// TODO Auto-generated method stub
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
index 6dc036a..d1a2b4c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
@@ -1,7 +1,9 @@
package org.apache.helix.integration.spectator;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -9,15 +11,22 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
import org.apache.helix.api.listeners.RoutingTableChangeListener;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.CustomizedView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.spectator.RoutingTableProvider;
@@ -52,6 +61,9 @@ public class TestRoutingTableProvider extends ZkTestBase {
private RoutingTableProvider _routingTableProvider_cs;
private boolean _listenerTestResult = true;
+
+ private static final AtomicBoolean customizedViewChangeCalled = new AtomicBoolean(false);
+
class MockRoutingTableChangeListener implements RoutingTableChangeListener {
@Override
public void onRoutingTableChange(RoutingTableSnapshot routingTableSnapshot, Object context) {
@@ -73,6 +85,19 @@ public class TestRoutingTableProvider extends ZkTestBase {
}
}
+ class MockRoutingTableProvider extends RoutingTableProvider {
+ MockRoutingTableProvider(HelixManager helixManager, Map<PropertyType, List<String>> sourceDataTypes) {
+ super(helixManager, sourceDataTypes);
+ }
+
+ @Override
+ public void onCustomizedViewChange(List<CustomizedView> customizedViewList,
+ NotificationContext changeContext){
+ customizedViewChangeCalled.getAndSet(true);
+ super.onCustomizedViewChange(customizedViewList, changeContext);
+ }
+ }
+
@BeforeClass
public void beforeClass() throws Exception {
System.out.println(
@@ -212,6 +237,90 @@ public class TestRoutingTableProvider extends ZkTestBase {
Sets.newSet(_instances.get(2)));
}
+ @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testShutdownInstance")
+ public void testExternalViewWithType() {
+ Map<PropertyType, List<String>> sourceDataTypes = new HashMap<>();
+ sourceDataTypes.put(PropertyType.EXTERNALVIEW, Arrays.asList("typeA"));
+ sourceDataTypes.put(PropertyType.CUSTOMIZEDVIEW, Arrays.asList("typeA", "typeB"));
+ RoutingTableProvider routingTableProvider;
+ routingTableProvider = new RoutingTableProvider(_spectator, sourceDataTypes);
+ }
+
+ @Test(dependsOnMethods = "testExternalViewWithType", expectedExceptions = HelixException.class)
+ public void testCustomizedViewWithoutType() {
+ RoutingTableProvider routingTableProvider;
+ routingTableProvider = new RoutingTableProvider(_spectator, PropertyType.CUSTOMIZEDVIEW);
+ }
+
+ @Test(dependsOnMethods = "testCustomizedViewWithoutType")
+ public void testCustomizedViewCorrectConstructor() throws Exception {
+ Map<PropertyType, List<String>> sourceDataTypes = new HashMap<>();
+ sourceDataTypes.put(PropertyType.CUSTOMIZEDVIEW, Arrays.asList("typeA"));
+ MockRoutingTableProvider routingTableProvider =
+ new MockRoutingTableProvider(_spectator, sourceDataTypes);
+
+ CustomizedView customizedView = new CustomizedView(TEST_DB);
+ customizedView.setState("p1", "h1", "testState");
+
+ // Clear the flag before writing to the Customized View Path
+ customizedViewChangeCalled.getAndSet(false);
+ String customizedViewPath = PropertyPathBuilder.customizedView(CLUSTER_NAME, "typeA", TEST_DB);
+ _spectator.getHelixDataAccessor().getBaseDataAccessor().set(customizedViewPath,
+ customizedView.getRecord(), AccessOption.PERSISTENT);
+
+ boolean onCustomizedViewChangeCalled =
+ TestHelper.verify(() -> customizedViewChangeCalled.get(), TestHelper.WAIT_DURATION);
+ Assert.assertTrue(onCustomizedViewChangeCalled);
+
+ _spectator.getHelixDataAccessor().getBaseDataAccessor().remove(customizedViewPath,
+ AccessOption.PERSISTENT);
+ routingTableProvider.shutdown();
+ }
+
+ @Test(dependsOnMethods = "testCustomizedViewCorrectConstructor")
+ public void testGetRoutingTableSnapshot() {
+ Map<PropertyType, List<String>> sourceDataTypes = new HashMap<>();
+ sourceDataTypes.put(PropertyType.CUSTOMIZEDVIEW, Arrays.asList("typeA", "typeB"));
+ sourceDataTypes.put(PropertyType.EXTERNALVIEW, Collections.emptyList());
+ RoutingTableProvider routingTableProvider =
+ new RoutingTableProvider(_spectator, sourceDataTypes);
+
+ CustomizedView customizedView1 = new CustomizedView("Resource1");
+ customizedView1.setState("p1", "h1", "testState1");
+ customizedView1.setState("p1", "h2", "testState1");
+ customizedView1.setState("p2", "h1", "testState2");
+ customizedView1.setState("p3", "h2", "testState3");
+ String customizedViewPath1 =
+ PropertyPathBuilder.customizedView(CLUSTER_NAME, "typeA", "Resource1");
+
+ CustomizedView customizedView2 = new CustomizedView("Resource2");
+ customizedView2.setState("p1", "h3", "testState3");
+ customizedView2.setState("p1", "h4", "testState2");
+ String customizedViewPath2 =
+ PropertyPathBuilder.customizedView(CLUSTER_NAME, "typeB", "Resource2");
+
+ _spectator.getHelixDataAccessor().getBaseDataAccessor().set(customizedViewPath1,
+ customizedView1.getRecord(), AccessOption.PERSISTENT);
+ _spectator.getHelixDataAccessor().getBaseDataAccessor().set(customizedViewPath2,
+ customizedView2.getRecord(), AccessOption.PERSISTENT);
+
+ RoutingTableSnapshot routingTableSnapshot =
+ routingTableProvider.getRoutingTableSnapshot(PropertyType.CUSTOMIZEDVIEW, "typeA");
+ Assert.assertEquals(routingTableSnapshot.getPropertyType(), PropertyType.CUSTOMIZEDVIEW);
+ Assert.assertEquals(routingTableSnapshot.getCustomizedStateType(), "typeA");
+ routingTableSnapshot =
+ routingTableProvider.getRoutingTableSnapshot(PropertyType.CUSTOMIZEDVIEW, "typeB");
+ Assert.assertEquals(routingTableSnapshot.getPropertyType(), PropertyType.CUSTOMIZEDVIEW);
+ Assert.assertEquals(routingTableSnapshot.getCustomizedStateType(), "typeB");
+
+ Map<String, Map<String, RoutingTableSnapshot>> routingTableSnapshots =
+ routingTableProvider.getRoutingTableSnapshots();
+ Assert.assertEquals(routingTableSnapshots.size(), 2);
+ Assert.assertEquals(routingTableSnapshots.get(PropertyType.CUSTOMIZEDVIEW.name()).size(), 2);
+ routingTableProvider.shutdown();
+ }
+
+
private void validateRoutingTable(RoutingTableProvider routingTableProvider,
Set<String> masterNodes, Set<String> slaveNodes) {
IdealState is =
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
index fdce32e..776fb95 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
@@ -2,6 +2,7 @@ package org.apache.helix.integration.spectator;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -16,6 +17,7 @@ import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.CustomizedView;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
@@ -144,9 +146,10 @@ public class TestRoutingTableProviderPeriodicRefresh extends ZkTestBase {
}
@Override
- protected synchronized void refresh(List<ExternalView> externalViewList,
- NotificationContext changeContext) {
- super.refresh(externalViewList, changeContext);
+ protected synchronized void refreshExternalView(Collection<ExternalView> externalViews,
+ Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances,
+ String referenceKey) {
+ super.refreshExternalView(externalViews, instanceConfigs, liveInstances, referenceKey);
_refreshCount++;
if (DEBUG) {
print();
@@ -154,20 +157,11 @@ public class TestRoutingTableProviderPeriodicRefresh extends ZkTestBase {
}
@Override
- protected synchronized void refresh(Collection<ExternalView> externalViews,
- Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
- super.refresh(externalViews, instanceConfigs, liveInstances);
- _refreshCount++;
- if (DEBUG) {
- print();
- }
- }
-
- @Override
- protected synchronized void refresh(
+ protected synchronized void refreshCurrentState(
Map<String, Map<String, Map<String, CurrentState>>> currentStateMap,
- Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
- super.refresh(currentStateMap, instanceConfigs, liveInstances);
+ Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances,
+ String referenceKey) {
+ super.refreshCurrentState(currentStateMap, instanceConfigs, liveInstances, "Test");
_refreshCount++;
if (DEBUG) {
print();
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockManager.java b/helix-core/src/test/java/org/apache/helix/mock/MockManager.java
index 4e95b39..1371a46 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockManager.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockManager.java
@@ -151,7 +151,7 @@ public class MockManager implements HelixManager {
}
@Override
- public void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String aggregationType) {
+ public void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String customizedStateType) {
// TODO Auto-generated method stub
}
diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index 3a9b617..fbd8599 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -183,7 +183,7 @@ public class MockZKHelixManager implements HelixManager {
}
@Override
- public void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String aggregationType) throws Exception {
+ public void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String customizedStateType) throws Exception {
// TODO Auto-generated method stub
}