You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/03/20 18:39:53 UTC
[1/2] helix git commit: Enable our scripts to be able to pump log
lines into the console.
Repository: helix
Updated Branches:
refs/heads/master 0e8490353 -> b3ee27d4a
Enable our scripts to be able to pump log lines into the console.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b3ee27d4
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b3ee27d4
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b3ee27d4
Branch: refs/heads/master
Commit: b3ee27d4a663d4b586191bc1b23e9c0d90cf6a1f
Parents: ae02b58
Author: Lei Xia <lx...@linkedin.com>
Authored: Thu Feb 8 10:51:31 2018 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Mar 20 11:31:47 2018 -0700
----------------------------------------------------------------------
helix-core/pom.xml | 1 +
helix-core/src/main/config/log4j.properties | 21 ++++++++++-----------
2 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/b3ee27d4/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index b94a108..b6a2539 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -189,6 +189,7 @@ under the License.
<groupId>org.codehaus.mojo</groupId>
<artifactId>appassembler-maven-plugin</artifactId>
<configuration>
+ <extraJvmArguments>-Dlog4j.configuration=file://"$BASEDIR"/conf/log4j.properties</extraJvmArguments>
<programs>
<program>
<mainClass>org.apache.helix.controller.HelixControllerMain</mainClass>
http://git-wip-us.apache.org/repos/asf/helix/blob/b3ee27d4/helix-core/src/main/config/log4j.properties
----------------------------------------------------------------------
diff --git a/helix-core/src/main/config/log4j.properties b/helix-core/src/main/config/log4j.properties
index 4b3dc31..5c183a1 100644
--- a/helix-core/src/main/config/log4j.properties
+++ b/helix-core/src/main/config/log4j.properties
@@ -16,16 +16,15 @@
# specific language governing permissions and limitations
# under the License.
#
+# Set root logger level to INFO and its only appender to A1.
+log4j.rootLogger=INFO, console
-# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=ERROR,A1
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Target=System.out
+log4j.appender.console.threshold=INFO
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=[%d] [%-5p] [%t] [%c:%L] - %m%n
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-log4j.logger.org.I0Itec=ERROR
-log4j.logger.org.apache=ERROR
+log4j.logger.org.I0Itec=INFO
+log4j.logger.org.apache=INFO
+log4j.logger.org.apache.helix=INFO
[2/2] helix git commit: Refactor ClusterDataCache,
break it into small cache components, including CurrentStateCache,
InstanceMessageCache and TaskDataCache,
and put the refresh logic into each cache component itself.
Posted by jx...@apache.org.
Refactor ClusterDataCache, break it into small cache components, including CurrentStateCache, InstanceMessageCache and TaskDataCache, and put the refresh logic into each cache component itself.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ae02b58d
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ae02b58d
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ae02b58d
Branch: refs/heads/master
Commit: ae02b58dc4880fe1f63bc66e0fdf93a63fd1d03f
Parents: 0e84903
Author: Lei Xia <lx...@linkedin.com>
Authored: Mon Feb 5 17:26:24 2018 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Tue Mar 20 11:31:47 2018 -0700
----------------------------------------------------------------------
.../helix/common/BasicClusterDataCache.java | 180 ---------
.../common/caches/BasicClusterDataCache.java | 199 +++++++++
.../helix/common/caches/CurrentStateCache.java | 206 ++++++++++
.../common/caches/InstanceMessagesCache.java | 222 ++++++++++
.../helix/common/caches/TaskDataCache.java | 232 +++++++++++
.../controller/stages/ClusterDataCache.java | 402 +++----------------
.../helix/spectator/RoutingDataCache.java | 2 +-
7 files changed, 912 insertions(+), 531 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/ae02b58d/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java
deleted file mode 100644
index d48cfbb..0000000
--- a/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java
+++ /dev/null
@@ -1,180 +0,0 @@
-package org.apache.helix.common;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.helix.HelixConstants;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyType;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Cache the cluster data
- */
-public class BasicClusterDataCache {
- protected final Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
-
- private Map<String, LiveInstance> _liveInstanceMap;
- private Map<String, InstanceConfig> _instanceConfigMap;
- private Map<String, ExternalView> _externalViewMap;
- private final PropertyType _sourceDataType;
-
- protected String _clusterName;
-
- protected Map<HelixConstants.ChangeType, Boolean> _propertyDataChangedMap;
-
- public BasicClusterDataCache(String clusterName) {
- this(clusterName, PropertyType.EXTERNALVIEW);
- }
-
- public BasicClusterDataCache(String clusterName, PropertyType sourceDataType) {
- _propertyDataChangedMap = new ConcurrentHashMap<>();
- _liveInstanceMap = new HashMap<>();
- _instanceConfigMap = new HashMap<>();
- _externalViewMap = new HashMap<>();
- _clusterName = clusterName;
- _sourceDataType = sourceDataType;
- }
-
- /**
- * This refreshes the cluster data by re-fetching the data from zookeeper in an efficient way
- *
- * @param accessor
- *
- * @return
- */
- public synchronized void refresh(HelixDataAccessor accessor) {
- LOG.info("START: ClusterDataCache.refresh() for cluster " + _clusterName);
- long startTime = System.currentTimeMillis();
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-
- if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) {
- long start = System.currentTimeMillis();
- _propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, Boolean.valueOf(false));
- switch (_sourceDataType) {
- case EXTERNALVIEW:
- _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews());
- break;
- case TARGETEXTERNALVIEW:
- _externalViewMap = accessor.getChildValuesMap(keyBuilder.targetExternalViews());
- break;
- default:
- break;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Reload ExternalViews: " + _externalViewMap.keySet() + ". Takes " + (
- System.currentTimeMillis() - start) + " ms");
- }
- }
-
- if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) {
- _propertyDataChangedMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, Boolean.valueOf(false));
- _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
- LOG.debug("Reload LiveInstances: " + _liveInstanceMap.keySet());
- }
-
- if (_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
- _propertyDataChangedMap
- .put(HelixConstants.ChangeType.INSTANCE_CONFIG, Boolean.valueOf(false));
- _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
- LOG.debug("Reload InstanceConfig: " + _instanceConfigMap.keySet());
- }
-
- long endTime = System.currentTimeMillis();
- LOG.info(
- "END: RoutingDataCache.refresh() for cluster " + _clusterName + ", took " + (endTime
- - startTime) + " ms");
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("LiveInstances: " + _liveInstanceMap.keySet());
- for (LiveInstance instance : _liveInstanceMap.values()) {
- LOG.debug("live instance: " + instance.getInstanceName() + " " + instance.getSessionId());
- }
- LOG.debug("ExternalViews: " + _externalViewMap.keySet());
- LOG.debug("InstanceConfigs: " + _instanceConfigMap.keySet());
- }
- }
-
- /**
- * Retrieves the ExternalView for all resources
- *
- * @return
- */
- public Map<String, ExternalView> getExternalViews() {
- return Collections.unmodifiableMap(_externalViewMap);
- }
-
- /**
- * Returns the LiveInstances for each of the instances that are curretnly up and running
- *
- * @return
- */
- public Map<String, LiveInstance> getLiveInstances() {
- return Collections.unmodifiableMap(_liveInstanceMap);
- }
-
- /**
- * Returns the instance config map
- *
- * @return
- */
- public Map<String, InstanceConfig> getInstanceConfigMap() {
- return Collections.unmodifiableMap(_instanceConfigMap);
- }
-
- /**
- * Notify the cache that some part of the cluster data has been changed.
- */
- public synchronized void notifyDataChange(HelixConstants.ChangeType changeType) {
- _propertyDataChangedMap.put(changeType, Boolean.valueOf(true));
- }
-
- /**
- * Indicate that a full read should be done on the next refresh
- */
- public synchronized void requireFullRefresh() {
- for(HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) {
- _propertyDataChangedMap.put(type, Boolean.valueOf(true));
- }
- }
-
- /**
- * toString method to print the data cache state
- */
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("liveInstaceMap:" + _liveInstanceMap).append("\n");
- sb.append("externalViewMap:" + _externalViewMap).append("\n");
- sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n");
-
- return sb.toString();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/helix/blob/ae02b58d/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
new file mode 100644
index 0000000..f470272
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
@@ -0,0 +1,199 @@
+package org.apache.helix.common.caches;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache the basic cluster data, including LiveInstances, InstanceConfigs and ExternalViews.
+ */
+public class BasicClusterDataCache {
+ protected final Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
+
+ private Map<String, LiveInstance> _liveInstanceMap;
+ private Map<String, InstanceConfig> _instanceConfigMap;
+ private Map<String, ExternalView> _externalViewMap;
+ private final PropertyType _sourceDataType;
+
+ protected String _clusterName;
+
+ protected Map<HelixConstants.ChangeType, Boolean> _propertyDataChangedMap;
+
+ public BasicClusterDataCache(String clusterName) {
+ this(clusterName, PropertyType.EXTERNALVIEW);
+ }
+
+ public BasicClusterDataCache(String clusterName, PropertyType sourceDataType) {
+ _propertyDataChangedMap = new ConcurrentHashMap<>();
+ _liveInstanceMap = new HashMap<>();
+ _instanceConfigMap = new HashMap<>();
+ _externalViewMap = new HashMap<>();
+ _clusterName = clusterName;
+ _sourceDataType = sourceDataType;
+ }
+
+ /**
+ * This refreshes the cluster data by re-fetching the data from zookeeper in an efficient way
+ *
+ * @param accessor
+ *
+ * @return
+ */
+ public synchronized void refresh(HelixDataAccessor accessor) {
+ LOG.info("START: ClusterDataCache.refresh() for cluster " + _clusterName);
+ long startTime = System.currentTimeMillis();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) {
+ long start = System.currentTimeMillis();
+ _propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, Boolean.valueOf(false));
+ switch (_sourceDataType) {
+ case EXTERNALVIEW:
+ _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews());
+ break;
+ case TARGETEXTERNALVIEW:
+ _externalViewMap = accessor.getChildValuesMap(keyBuilder.targetExternalViews());
+ break;
+ default:
+ break;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reload ExternalViews: " + _externalViewMap.keySet() + ". Takes " + (
+ System.currentTimeMillis() - start) + " ms");
+ }
+ }
+
+ if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) {
+ _propertyDataChangedMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, Boolean.valueOf(false));
+ _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
+ LOG.debug("Reload LiveInstances: " + _liveInstanceMap.keySet());
+ }
+
+ if (_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
+ _propertyDataChangedMap
+ .put(HelixConstants.ChangeType.INSTANCE_CONFIG, Boolean.valueOf(false));
+ _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+ LOG.debug("Reload InstanceConfig: " + _instanceConfigMap.keySet());
+ }
+
+ long endTime = System.currentTimeMillis();
+ LOG.info(
+ "END: RoutingDataCache.refresh() for cluster " + _clusterName + ", took " + (endTime
+ - startTime) + " ms");
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LiveInstances: " + _liveInstanceMap.keySet());
+ for (LiveInstance instance : _liveInstanceMap.values()) {
+ LOG.debug("live instance: " + instance.getInstanceName() + " " + instance.getSessionId());
+ }
+ LOG.debug("ExternalViews: " + _externalViewMap.keySet());
+ LOG.debug("InstanceConfigs: " + _instanceConfigMap.keySet());
+ }
+ }
+
+ /**
+ * Retrieves the ExternalView for all resources
+ *
+ * @return
+ */
+ public Map<String, ExternalView> getExternalViews() {
+ return Collections.unmodifiableMap(_externalViewMap);
+ }
+
+ /**
+ * Returns the LiveInstances for each of the instances that are curretnly up and running
+ *
+ * @return
+ */
+ public Map<String, LiveInstance> getLiveInstances() {
+ return Collections.unmodifiableMap(_liveInstanceMap);
+ }
+
+ /**
+ * Returns the instance config map
+ *
+ * @return
+ */
+ public Map<String, InstanceConfig> getInstanceConfigMap() {
+ return Collections.unmodifiableMap(_instanceConfigMap);
+ }
+
+ /**
+ * Notify the cache that some part of the cluster data has been changed.
+ */
+ public synchronized void notifyDataChange(HelixConstants.ChangeType changeType) {
+ _propertyDataChangedMap.put(changeType, Boolean.valueOf(true));
+ }
+
+ /**
+ * Clear the corresponding cache based on change type
+ * @param changeType
+ */
+ public synchronized void clearCache(HelixConstants.ChangeType changeType) {
+ switch (changeType) {
+ case LIVE_INSTANCE:
+ _liveInstanceMap.clear();
+ break;
+ case INSTANCE_CONFIG:
+ _instanceConfigMap.clear();
+ break;
+ case EXTERNAL_VIEW:
+ _externalViewMap.clear();
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * Indicate that a full read should be done on the next refresh
+ */
+ public synchronized void requireFullRefresh() {
+ for(HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) {
+ _propertyDataChangedMap.put(type, Boolean.valueOf(true));
+ }
+ }
+
+ /**
+ * toString method to print the data cache state
+ */
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("liveInstaceMap:" + _liveInstanceMap).append("\n");
+ sb.append("externalViewMap:" + _externalViewMap).append("\n");
+ sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n");
+
+ return sb.toString();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/helix/blob/ae02b58d/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
new file mode 100644
index 0000000..d921512
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
@@ -0,0 +1,206 @@
+package org.apache.helix.common.caches;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache to hold all CurrentStates of a cluster.
+ */
+public class CurrentStateCache {
+ private static final Logger LOG = LoggerFactory.getLogger(CurrentStateCache.class.getName());
+
+ private Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap;
+ private Map<PropertyKey, CurrentState> _currentStateCache = Maps.newHashMap();
+
+ private String _clusterName;
+
+ public CurrentStateCache(String clusterName) {
+ _clusterName = clusterName;
+ _currentStateMap = Collections.emptyMap();
+ }
+
+ /**
+ * This refreshes the CurrentStates data by re-fetching the data from zookeeper in an efficient
+ * way
+ *
+ * @param accessor
+ * @param liveInstanceMap map of all liveInstances in cluster
+ *
+ * @return
+ */
+ public synchronized boolean refresh(HelixDataAccessor accessor,
+ Map<String, LiveInstance> liveInstanceMap) {
+ LOG.info("START: CurrentStateCache.refresh()");
+ long startTime = System.currentTimeMillis();
+
+ refreshCurrentStatesCache(accessor, liveInstanceMap);
+
+ Map<String, Map<String, Map<String, CurrentState>>> allCurStateMap = new HashMap<>();
+ for (PropertyKey key : _currentStateCache.keySet()) {
+ CurrentState currentState = _currentStateCache.get(key);
+ String[] params = key.getParams();
+ if (currentState != null && params.length >= 4) {
+ String instanceName = params[1];
+ String sessionId = params[2];
+ String stateName = params[3];
+ Map<String, Map<String, CurrentState>> instanceCurStateMap =
+ allCurStateMap.get(instanceName);
+ if (instanceCurStateMap == null) {
+ instanceCurStateMap = Maps.newHashMap();
+ allCurStateMap.put(instanceName, instanceCurStateMap);
+ }
+ Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(sessionId);
+ if (sessionCurStateMap == null) {
+ sessionCurStateMap = Maps.newHashMap();
+ instanceCurStateMap.put(sessionId, sessionCurStateMap);
+ }
+ sessionCurStateMap.put(stateName, currentState);
+ }
+ }
+
+ for (String instance : allCurStateMap.keySet()) {
+ allCurStateMap.put(instance, Collections.unmodifiableMap(allCurStateMap.get(instance)));
+ }
+ _currentStateMap = Collections.unmodifiableMap(allCurStateMap);
+
+ long endTime = System.currentTimeMillis();
+ LOG.info("END: CurrentStateCache.refresh() for cluster " + _clusterName + ", took " + (endTime
+ - startTime) + " ms");
+ return true;
+ }
+
+ // reload current states that has been changed from zk to local cache.
+ private void refreshCurrentStatesCache(HelixDataAccessor accessor,
+ Map<String, LiveInstance> liveInstanceMap) {
+ long start = System.currentTimeMillis();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ List<PropertyKey> currentStateKeys = Lists.newLinkedList();
+ for (String instanceName : liveInstanceMap.keySet()) {
+ LiveInstance liveInstance = liveInstanceMap.get(instanceName);
+ String sessionId = liveInstance.getSessionId();
+ List<String> currentStateNames =
+ accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
+ for (String currentStateName : currentStateNames) {
+ currentStateKeys.add(keyBuilder.currentState(instanceName, sessionId, currentStateName));
+ }
+ }
+
+ // All new entries from zk not cached locally yet should be read from ZK.
+ List<PropertyKey> reloadKeys = Lists.newLinkedList(currentStateKeys);
+ reloadKeys.removeAll(_currentStateCache.keySet());
+
+ List<PropertyKey> cachedKeys = Lists.newLinkedList(_currentStateCache.keySet());
+ cachedKeys.retainAll(currentStateKeys);
+
+ List<HelixProperty.Stat> stats = accessor.getPropertyStats(cachedKeys);
+ Map<PropertyKey, CurrentState> currentStatesMap = Maps.newHashMap();
+ for (int i = 0; i < cachedKeys.size(); i++) {
+ PropertyKey key = cachedKeys.get(i);
+ HelixProperty.Stat stat = stats.get(i);
+ if (stat != null) {
+ CurrentState property = _currentStateCache.get(key);
+ if (property != null && property.getBucketSize() == 0 && property.getStat().equals(stat)) {
+ currentStatesMap.put(key, property);
+ } else {
+ // need update from zk
+ reloadKeys.add(key);
+ }
+ } else {
+ LOG.debug("stat is null for key: " + key);
+ reloadKeys.add(key);
+ }
+ }
+
+ List<CurrentState> currentStates = accessor.getProperty(reloadKeys);
+ Iterator<PropertyKey> csKeyIter = reloadKeys.iterator();
+ for (CurrentState currentState : currentStates) {
+ PropertyKey key = csKeyIter.next();
+ if (currentState != null) {
+ currentStatesMap.put(key, currentState);
+ } else {
+ LOG.debug("CurrentState null for key: " + key);
+ }
+ }
+
+ _currentStateCache = Collections.unmodifiableMap(currentStatesMap);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("# of CurrentState paths read from ZooKeeper " + reloadKeys.size());
+ LOG.debug("# of CurrentState paths skipped reading from ZK: " + (currentStateKeys.size()
+ - reloadKeys.size()));
+ }
+ LOG.info("Takes " + (System.currentTimeMillis() - start) + " ms to reload new current states!");
+ }
+
+ /**
+ * Return CurrentStates map for all instances.
+ *
+ * @return
+ */
+ public Map<String, Map<String, Map<String, CurrentState>>> getCurrentStatesMap() {
+ return Collections.unmodifiableMap(_currentStateMap);
+ }
+
+ /**
+ * Return all CurrentState on the given instance.
+ *
+ * @param instance
+ *
+ * @return
+ */
+ public Map<String, Map<String, CurrentState>> getCurrentStates(String instance) {
+ if (!_currentStateMap.containsKey(instance)) {
+ return Collections.emptyMap();
+ }
+ return Collections.unmodifiableMap(_currentStateMap.get(instance));
+ }
+
+ /**
+ * Provides the current state of the node for a given session id, the sessionid can be got from
+ * LiveInstance
+ *
+ * @param instance
+ * @param clientSessionId
+ *
+ * @return
+ */
+ public Map<String, CurrentState> getCurrentState(String instance, String clientSessionId) {
+ if (!_currentStateMap.containsKey(instance) || !_currentStateMap.get(instance)
+ .containsKey(clientSessionId)) {
+ return Collections.emptyMap();
+ }
+ return Collections.unmodifiableMap(_currentStateMap.get(instance).get(clientSessionId));
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/ae02b58d/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
new file mode 100644
index 0000000..9ac40c3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
@@ -0,0 +1,222 @@
+package org.apache.helix.common.caches;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache for holding pending messages in all instances in the given cluster.
+ */
+public class InstanceMessagesCache {
+ private static final Logger LOG = LoggerFactory.getLogger(InstanceMessagesCache.class.getName());
+ private Map<String, Map<String, Message>> _messageMap;
+
+ // maintain a cache of participant messages across pipeline runs
+ private Map<String, Map<String, Message>> _messageCache = Maps.newHashMap();
+ private String _clusterName;
+
+ public InstanceMessagesCache(String clusterName) {
+ _clusterName = clusterName;
+ }
+
+ /**
+ * This refreshes all pending messages in the cluster by re-fetching the data from zookeeper in an
+ * efficient way
+ * current state must be refreshed before refreshing relay messages because we need to use current
+ * state to validate all relay messages.
+ *
+ * @param accessor
+ * @param liveInstanceMap
+ *
+ * @return
+ */
+ public synchronized boolean refresh(HelixDataAccessor accessor,
+ Map<String, LiveInstance> liveInstanceMap) {
+ LOG.info("START: InstanceMessagesCache.refresh()");
+ long startTime = System.currentTimeMillis();
+
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ Map<String, Map<String, Message>> msgMap = new HashMap<>();
+ List<PropertyKey> newMessageKeys = Lists.newLinkedList();
+ long purgeSum = 0;
+ for (String instanceName : liveInstanceMap.keySet()) {
+ // get the cache
+ Map<String, Message> cachedMap = _messageCache.get(instanceName);
+ if (cachedMap == null) {
+ cachedMap = Maps.newHashMap();
+ _messageCache.put(instanceName, cachedMap);
+ }
+ msgMap.put(instanceName, cachedMap);
+
+ // get the current names
+ Set<String> messageNames =
+ Sets.newHashSet(accessor.getChildNames(keyBuilder.messages(instanceName)));
+
+ long purgeStart = System.currentTimeMillis();
+ // clear stale names
+ Iterator<String> cachedNamesIter = cachedMap.keySet().iterator();
+ while (cachedNamesIter.hasNext()) {
+ String messageName = cachedNamesIter.next();
+ if (!messageNames.contains(messageName)) {
+ cachedNamesIter.remove();
+ }
+ }
+ long purgeEnd = System.currentTimeMillis();
+ purgeSum += purgeEnd - purgeStart;
+
+ // get the keys for the new messages
+ for (String messageName : messageNames) {
+ if (!cachedMap.containsKey(messageName)) {
+ newMessageKeys.add(keyBuilder.message(instanceName, messageName));
+ }
+ }
+ }
+
+ // get the new messages
+ if (newMessageKeys.size() > 0) {
+ List<Message> newMessages = accessor.getProperty(newMessageKeys);
+ for (Message message : newMessages) {
+ if (message != null) {
+ Map<String, Message> cachedMap = _messageCache.get(message.getTgtName());
+ cachedMap.put(message.getId(), message);
+ }
+ }
+ }
+
+ _messageMap = Collections.unmodifiableMap(msgMap);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Message purge took: " + purgeSum);
+ LOG.debug("# of Messages read from ZooKeeper " + newMessageKeys.size() + ". took " + (
+ System.currentTimeMillis() - startTime) + " ms.");
+ }
+
+ return true;
+ }
+
+ // update all valid relay messages attached to existing state transition messages into message map.
+ public void updateRelayMessages(Map<String, LiveInstance> liveInstanceMap,
+ Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) {
+ List<Message> relayMessages = new ArrayList<>();
+ for (String instance : _messageMap.keySet()) {
+ Map<String, Message> instanceMessages = _messageMap.get(instance);
+ Map<String, Map<String, CurrentState>> instanceCurrentStateMap =
+ currentStateMap.get(instance);
+ if (instanceCurrentStateMap == null) {
+ continue;
+ }
+
+ for (Message message : instanceMessages.values()) {
+ if (message.hasRelayMessages()) {
+ String sessionId = message.getTgtSessionId();
+ String resourceName = message.getResourceName();
+ String partitionName = message.getPartitionName();
+ String targetState = message.getToState();
+ String instanceSessionId = liveInstanceMap.get(instance).getSessionId();
+
+ if (!instanceSessionId.equals(sessionId)) {
+ continue;
+ }
+
+ Map<String, CurrentState> sessionCurrentStateMap = instanceCurrentStateMap.get(sessionId);
+ if (sessionCurrentStateMap == null) {
+ continue;
+ }
+ CurrentState currentState = sessionCurrentStateMap.get(resourceName);
+ if (currentState == null || !targetState.equals(currentState.getState(partitionName))) {
+ continue;
+ }
+ long transitionCompleteTime = currentState.getEndTime(partitionName);
+
+ for (Message msg : message.getRelayMessages().values()) {
+ msg.setRelayTime(transitionCompleteTime);
+ if (!message.isExpired()) {
+ relayMessages.add(msg);
+ }
+ }
+ }
+ }
+ }
+
+ for (Message message : relayMessages) {
+ String instance = message.getTgtName();
+ Map<String, Message> instanceMessages = _messageMap.get(instance);
+ if (instanceMessages == null) {
+ instanceMessages = new HashMap<>();
+ _messageMap.put(instance, instanceMessages);
+ }
+ instanceMessages.put(message.getId(), message);
+ }
+ }
+
+ /**
+ * Provides a list of current outstanding transitions on a given instance.
+ *
+ * @param instanceName
+ *
+ * @return
+ */
+ public Map<String, Message> getMessages(String instanceName) {
+ Map<String, Message> map = _messageMap.get(instanceName);
+ if (map != null) {
+ return map;
+ } else {
+ return Collections.emptyMap();
+ }
+ }
+
+ public void cacheMessages(List<Message> messages) {
+ for (Message message : messages) {
+ String instanceName = message.getTgtName();
+ Map<String, Message> instMsgMap;
+ if (_messageCache.containsKey(instanceName)) {
+ instMsgMap = _messageCache.get(instanceName);
+ } else {
+ instMsgMap = Maps.newHashMap();
+ _messageCache.put(instanceName, instMsgMap);
+ }
+ instMsgMap.put(message.getId(), message);
+ }
+ }
+
+ @Override public String toString() {
+ return "InstanceMessagesCache{" +
+ "_messageMap=" + _messageMap +
+ ", _messageCache=" + _messageCache +
+ ", _clusterName='" + _clusterName + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/ae02b58d/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
new file mode 100644
index 0000000..2dbb4f8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
@@ -0,0 +1,232 @@
+package org.apache.helix.common.caches;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Joiner;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache for holding all task related cluster data, such as WorkflowConfig, JobConfig and Contexts.
+ */
+public class TaskDataCache {
+ private static final Logger LOG = LoggerFactory.getLogger(TaskDataCache.class.getName());
+ private static final String NAME = "NAME";
+
+ private Map<String, JobConfig> _jobConfigMap = new HashMap<>();
+ private Map<String, WorkflowConfig> _workflowConfigMap = new HashMap<>();
+ private Map<String, ZNRecord> _contextMap = new HashMap<>();
+
+ private String _clusterName;
+
+ public TaskDataCache(String clusterName) {
+ _clusterName = clusterName;
+ }
+
+ /**
+ * This refreshes the cluster data by re-fetching the data from zookeeper in an efficient way
+ *
+ * @param accessor
+ *
+ * @return
+ */
+ public synchronized boolean refresh(HelixDataAccessor accessor,
+ Map<String, ResourceConfig> resourceConfigMap) {
+ refreshJobContexts(accessor);
+
+ // update workflow and job configs.
+ _workflowConfigMap.clear();
+ _jobConfigMap.clear();
+ for (Map.Entry<String, ResourceConfig> entry : resourceConfigMap.entrySet()) {
+ if (entry.getValue().getRecord().getSimpleFields()
+ .containsKey(WorkflowConfig.WorkflowConfigProperty.Dag.name())) {
+ _workflowConfigMap.put(entry.getKey(), new WorkflowConfig(entry.getValue()));
+ } else if (entry.getValue().getRecord().getSimpleFields()
+ .containsKey(WorkflowConfig.WorkflowConfigProperty.WorkflowID.name())) {
+ _jobConfigMap.put(entry.getKey(), new JobConfig(entry.getValue()));
+ }
+ }
+
+ return true;
+ }
+
+ private void refreshJobContexts(HelixDataAccessor accessor) {
+ // TODO: Need an optimize for reading context only if the refresh is needed.
+ long start = System.currentTimeMillis();
+ _contextMap.clear();
+ if (_clusterName == null) {
+ return;
+ }
+ String path = String.format("/%s/%s%s", _clusterName, PropertyType.PROPERTYSTORE.name(),
+ TaskConstants.REBALANCER_CONTEXT_ROOT);
+ List<String> contextPaths = new ArrayList<>();
+ List<String> childNames = accessor.getBaseDataAccessor().getChildNames(path, 0);
+ if (childNames == null) {
+ return;
+ }
+ for (String context : childNames) {
+ contextPaths.add(Joiner.on("/").join(path, context, TaskConstants.CONTEXT_NODE));
+ }
+
+ List<ZNRecord> contexts = accessor.getBaseDataAccessor().get(contextPaths, null, 0);
+ for (int i = 0; i < contexts.size(); i++) {
+ ZNRecord context = contexts.get(i);
+ if (context != null && context.getSimpleField(NAME) != null) {
+ _contextMap.put(context.getSimpleField(NAME), context);
+ } else {
+ _contextMap.put(childNames.get(i), context);
+ LOG.debug(
+ String.format("Context for %s is null or miss the context NAME!", childNames.get((i))));
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("# of workflow/job context read from zk: " + _contextMap.size() + ". Take " + (
+ System.currentTimeMillis() - start) + " ms");
+ }
+ }
+
+ /**
+ * Returns job config map
+ *
+ * @return
+ */
+ public Map<String, JobConfig> getJobConfigMap() {
+ return _jobConfigMap;
+ }
+
+ /**
+ * Returns job config
+ *
+ * @param resource
+ *
+ * @return
+ */
+ public JobConfig getJobConfig(String resource) {
+ return _jobConfigMap.get(resource);
+ }
+
+ /**
+ * Returns workflow config map
+ *
+ * @return
+ */
+ public Map<String, WorkflowConfig> getWorkflowConfigMap() {
+ return _workflowConfigMap;
+ }
+
+ /**
+ * Returns workflow config
+ *
+ * @param resource
+ *
+ * @return
+ */
+ public WorkflowConfig getWorkflowConfig(String resource) {
+ return _workflowConfigMap.get(resource);
+ }
+
+ /**
+ * Return the JobContext by resource name
+ *
+ * @param resourceName
+ *
+ * @return
+ */
+ public JobContext getJobContext(String resourceName) {
+ if (_contextMap.containsKey(resourceName) && _contextMap.get(resourceName) != null) {
+ return new JobContext(_contextMap.get(resourceName));
+ }
+ return null;
+ }
+
+ /**
+ * Return the WorkflowContext by resource name
+ *
+ * @param resourceName
+ *
+ * @return
+ */
+ public WorkflowContext getWorkflowContext(String resourceName) {
+ if (_contextMap.containsKey(resourceName) && _contextMap.get(resourceName) != null) {
+ return new WorkflowContext(_contextMap.get(resourceName));
+ }
+ return null;
+ }
+
+ /**
+ * Update context of the Job
+ */
+ public void updateJobContext(String resourceName, JobContext jobContext,
+ HelixDataAccessor accessor) {
+ updateContext(resourceName, jobContext.getRecord(), accessor);
+ }
+
+ /**
+ * Update context of the Workflow
+ */
+ public void updateWorkflowContext(String resourceName, WorkflowContext workflowContext,
+ HelixDataAccessor accessor) {
+ updateContext(resourceName, workflowContext.getRecord(), accessor);
+ }
+
+ /**
+ * Update context of the Workflow or Job
+ */
+ private void updateContext(String resourceName, ZNRecord record, HelixDataAccessor accessor) {
+ String path = String.format("/%s/%s%s/%s/%s", _clusterName, PropertyType.PROPERTYSTORE.name(),
+ TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, TaskConstants.CONTEXT_NODE);
+ accessor.getBaseDataAccessor().set(path, record, AccessOption.PERSISTENT);
+ _contextMap.put(resourceName, record);
+ }
+
+ /**
+ * Return map of WorkflowContexts or JobContexts
+ *
+ * @return
+ */
+ public Map<String, ZNRecord> getContexts() {
+ return _contextMap;
+ }
+
+ @Override public String toString() {
+ return "TaskDataCache{" +
+ "_jobConfigMap=" + _jobConfigMap +
+ ", _workflowConfigMap=" + _workflowConfigMap +
+ ", _contextMap=" + _contextMap +
+ ", _clusterName='" + _clusterName + '\'' +
+ '}';
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/helix/blob/ae02b58d/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 47413fc..4fa0c8c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -19,24 +19,23 @@ package org.apache.helix.controller.stages;
* under the License.
*/
+import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
-
-import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
+import org.apache.helix.common.caches.CurrentStateCache;
+import org.apache.helix.common.caches.InstanceMessagesCache;
+import org.apache.helix.common.caches.TaskDataCache;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -60,11 +59,6 @@ import org.apache.helix.task.WorkflowContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
import static org.apache.helix.HelixConstants.ChangeType;
/**
@@ -87,26 +81,21 @@ public class ClusterDataCache {
private Map<String, ResourceConfig> _resourceConfigMap;
private Map<String, ResourceConfig> _resourceConfigCacheMap;
private Map<String, ClusterConstraints> _constraintMap;
- private Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap;
- private Map<String, Map<String, Message>> _messageMap;
private Map<String, Map<String, String>> _idealStateRuleMap;
private Map<String, Map<String, Long>> _missingTopStateMap = new HashMap<>();
- private Map<String, JobConfig> _jobConfigMap = new HashMap<>();
- private Map<String, WorkflowConfig> _workflowConfigMap = new HashMap<>();
- private Map<String, ZNRecord> _contextMap = new HashMap<>();
- private Map<String, ExternalView> _targetExternalViewMap = Maps.newHashMap();
+ private Map<String, ExternalView> _targetExternalViewMap = new HashMap<>();
- // maintain a cache of participant messages across pipeline runs
- private Map<String, Map<String, Message>> _messageCache = Maps.newHashMap();
- private Map<PropertyKey, CurrentState> _currentStateCache = Maps.newHashMap();
+ private CurrentStateCache _currentStateCache;
+ private TaskDataCache _taskDataCache;
+ private InstanceMessagesCache _instanceMessagesCache;
// maintain a cache of bestPossible assignment across pipeline runs
// TODO: this is only for customRebalancer, remove it and merge it with _idealMappingCache.
- private Map<String, ResourceAssignment> _resourceAssignmentCache = Maps.newHashMap();
+ private Map<String, ResourceAssignment> _resourceAssignmentCache = new HashMap<>();
// maintain a cache of idealmapping (preference list) for full-auto resource across pipeline runs
- private Map<String, ZNRecord> _idealMappingCache = Maps.newHashMap();
+ private Map<String, ZNRecord> _idealMappingCache = new HashMap<>();
private Map<ChangeType, Boolean> _propertyDataChangedMap;
@@ -122,15 +111,18 @@ public class ClusterDataCache {
private String _clusterName;
public ClusterDataCache () {
+ this(null);
+ }
+
+ public ClusterDataCache(String clusterName) {
_propertyDataChangedMap = new ConcurrentHashMap<>();
- for(ChangeType type : ChangeType.values()) {
+ for (ChangeType type : ChangeType.values()) {
_propertyDataChangedMap.put(type, Boolean.valueOf(true));
}
- }
-
- public ClusterDataCache (String clusterName) {
- this();
_clusterName = clusterName;
+ _currentStateCache = new CurrentStateCache(_clusterName);
+ _taskDataCache = new TaskDataCache(_clusterName);
+ _instanceMessagesCache = new InstanceMessagesCache(_clusterName);
}
/**
@@ -177,18 +169,17 @@ public class ClusterDataCache {
LOG.debug("Reload ResourceConfigs: " + _resourceConfigCacheMap.size());
}
- _idealStateMap = Maps.newHashMap(_idealStateCacheMap);
- _liveInstanceMap = Maps.newHashMap(_liveInstanceCacheMap);
+ _idealStateMap = new HashMap<>(_idealStateCacheMap);
+ _liveInstanceMap = new HashMap(_liveInstanceCacheMap);
_instanceConfigMap = new ConcurrentHashMap<>(_instanceConfigCacheMap);
- _resourceConfigMap = Maps.newHashMap(_resourceConfigCacheMap);
+ _resourceConfigMap = new HashMap(_resourceConfigCacheMap);
if (_updateInstanceOfflineTime) {
updateOfflineInstanceHistory(accessor);
}
if (_isTaskCache) {
- refreshJobContexts(accessor);
- updateWorkflowJobConfigs();
+ _taskDataCache.refresh(accessor, _resourceConfigMap);
}
Map<String, StateModelDefinition> stateDefMap =
@@ -197,17 +188,20 @@ public class ClusterDataCache {
_constraintMap = accessor.getChildValuesMap(keyBuilder.constraints());
_clusterConfig = accessor.getProperty(keyBuilder.clusterConfig());
- refreshMessages(accessor);
- refreshCurrentStates(accessor);
+ _instanceMessagesCache
+ .refresh(accessor, _liveInstanceMap);
+ _currentStateCache.refresh(accessor, _liveInstanceMap);
+
// current state must be refreshed before refreshing relay messages
// because we need to use current state to validate all relay messages.
- updateRelayMessages(_messageMap);
+ _instanceMessagesCache
+ .updateRelayMessages(_liveInstanceMap, _currentStateCache.getCurrentStatesMap());
if (_clusterConfig != null) {
_idealStateRuleMap = _clusterConfig.getIdealStateRules();
} else {
- _idealStateRuleMap = Maps.newHashMap();
+ _idealStateRuleMap = new HashMap();
LOG.warn("Cluster config is null!");
}
@@ -229,7 +223,7 @@ public class ClusterDataCache {
LOG.debug("ResourceConfigs: " + _resourceConfigMap.keySet());
LOG.debug("InstanceConfigs: " + _instanceConfigMap.keySet());
LOG.debug("ClusterConfigs: " + _clusterConfig);
- LOG.debug("JobContexts: " + _contextMap.keySet());
+ LOG.debug("JobContexts: " + _taskDataCache.getContexts().keySet());
}
if (LOG.isTraceEnabled()) {
@@ -239,215 +233,6 @@ public class ClusterDataCache {
return true;
}
- private void refreshMessages(HelixDataAccessor accessor) {
- long start = System.currentTimeMillis();
- Builder keyBuilder = accessor.keyBuilder();
-
- Map<String, Map<String, Message>> msgMap = new HashMap<>();
- List<PropertyKey> newMessageKeys = Lists.newLinkedList();
- long purgeSum = 0;
- for (String instanceName : _liveInstanceMap.keySet()) {
- // get the cache
- Map<String, Message> cachedMap = _messageCache.get(instanceName);
- if (cachedMap == null) {
- cachedMap = Maps.newHashMap();
- _messageCache.put(instanceName, cachedMap);
- }
- msgMap.put(instanceName, cachedMap);
-
- // get the current names
- Set<String> messageNames =
- Sets.newHashSet(accessor.getChildNames(keyBuilder.messages(instanceName)));
-
- long purgeStart = System.currentTimeMillis();
- // clear stale names
- Iterator<String> cachedNamesIter = cachedMap.keySet().iterator();
- while (cachedNamesIter.hasNext()) {
- String messageName = cachedNamesIter.next();
- if (!messageNames.contains(messageName)) {
- cachedNamesIter.remove();
- }
- }
- long purgeEnd = System.currentTimeMillis();
- purgeSum += purgeEnd - purgeStart;
-
- // get the keys for the new messages
- for (String messageName : messageNames) {
- if (!cachedMap.containsKey(messageName)) {
- newMessageKeys.add(keyBuilder.message(instanceName, messageName));
- }
- }
- }
-
- // get the new messages
- if (newMessageKeys.size() > 0) {
- List<Message> newMessages = accessor.getProperty(newMessageKeys);
- for (Message message : newMessages) {
- if (message != null) {
- Map<String, Message> cachedMap = _messageCache.get(message.getTgtName());
- cachedMap.put(message.getId(), message);
- }
- }
- }
-
- _messageMap = Collections.unmodifiableMap(msgMap);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Message purge took: " + purgeSum);
- LOG.debug("# of Messages read from ZooKeeper " + newMessageKeys.size() + ". took " + (
- System.currentTimeMillis() - start) + " ms.");
- }
- }
-
- // update all valid relay messages attached to existing state transition messages into message map.
- private void updateRelayMessages(Map<String, Map<String, Message>> messageMap) {
- List<Message> relayMessages = new ArrayList<>();
- for (String instance : messageMap.keySet()) {
- Map<String, Message> instanceMessages = messageMap.get(instance);
- Map<String, Map<String, CurrentState>> instanceCurrentStateMap = _currentStateMap.get(instance);
- if (instanceCurrentStateMap == null) {
- continue;
- }
-
- for (Message message : instanceMessages.values()) {
- if (message.hasRelayMessages()) {
- String sessionId = message.getTgtSessionId();
- String resourceName = message.getResourceName();
- String partitionName = message.getPartitionName();
- String targetState = message.getToState();
- String instanceSessionId = _liveInstanceMap.get(instance).getSessionId();
-
- if (!instanceSessionId.equals(sessionId)) {
- continue;
- }
-
- Map<String, CurrentState> sessionCurrentStateMap = instanceCurrentStateMap.get(sessionId);
- if (sessionCurrentStateMap == null) {
- continue;
- }
- CurrentState currentState = sessionCurrentStateMap.get(resourceName);
- if (currentState == null || !targetState.equals(currentState.getState(partitionName))) {
- continue;
- }
- long transitionCompleteTime = currentState.getEndTime(partitionName);
-
- for (Message msg : message.getRelayMessages().values()) {
- msg.setRelayTime(transitionCompleteTime);
- if (!message.isExpired()) {
- relayMessages.add(msg);
- }
- }
- }
- }
- }
-
- for (Message message : relayMessages) {
- String instance = message.getTgtName();
- Map<String, Message> instanceMessages = messageMap.get(instance);
- if (instanceMessages == null) {
- instanceMessages = new HashMap<>();
- messageMap.put(instance, instanceMessages);
- }
- instanceMessages.put(message.getId(), message);
- }
- }
-
- private void refreshCurrentStates(HelixDataAccessor accessor) {
- refreshCurrentStatesCache(accessor);
-
- Map<String, Map<String, Map<String, CurrentState>>> allCurStateMap = new HashMap<>();
- for (PropertyKey key : _currentStateCache.keySet()) {
- CurrentState currentState = _currentStateCache.get(key);
- String[] params = key.getParams();
- if (currentState != null && params.length >= 4) {
- String instanceName = params[1];
- String sessionId = params[2];
- String stateName = params[3];
- Map<String, Map<String, CurrentState>> instanceCurStateMap =
- allCurStateMap.get(instanceName);
- if (instanceCurStateMap == null) {
- instanceCurStateMap = Maps.newHashMap();
- allCurStateMap.put(instanceName, instanceCurStateMap);
- }
- Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(sessionId);
- if (sessionCurStateMap == null) {
- sessionCurStateMap = Maps.newHashMap();
- instanceCurStateMap.put(sessionId, sessionCurStateMap);
- }
- sessionCurStateMap.put(stateName, currentState);
- }
- }
-
- for (String instance : allCurStateMap.keySet()) {
- allCurStateMap.put(instance, Collections.unmodifiableMap(allCurStateMap.get(instance)));
- }
- _currentStateMap = Collections.unmodifiableMap(allCurStateMap);
- }
-
- // reload current states that has been changed from zk to local cache.
- private void refreshCurrentStatesCache(HelixDataAccessor accessor) {
- long start = System.currentTimeMillis();
- Builder keyBuilder = accessor.keyBuilder();
-
- List<PropertyKey> currentStateKeys = Lists.newLinkedList();
- for (String instanceName : _liveInstanceMap.keySet()) {
- LiveInstance liveInstance = _liveInstanceMap.get(instanceName);
- String sessionId = liveInstance.getSessionId();
- List<String> currentStateNames =
- accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
- for (String currentStateName : currentStateNames) {
- currentStateKeys.add(keyBuilder.currentState(instanceName, sessionId, currentStateName));
- }
- }
-
- // All new entries from zk not cached locally yet should be read from ZK.
- List<PropertyKey> reloadKeys = Lists.newLinkedList(currentStateKeys);
- reloadKeys.removeAll(_currentStateCache.keySet());
-
- List<PropertyKey> cachedKeys = Lists.newLinkedList(_currentStateCache.keySet());
- cachedKeys.retainAll(currentStateKeys);
-
- List<HelixProperty.Stat> stats = accessor.getPropertyStats(cachedKeys);
- Map<PropertyKey, CurrentState> currentStatesMap = Maps.newHashMap();
- for (int i=0; i < cachedKeys.size(); i++) {
- PropertyKey key = cachedKeys.get(i);
- HelixProperty.Stat stat = stats.get(i);
- if (stat != null) {
- CurrentState property = _currentStateCache.get(key);
- if (property != null && property.getBucketSize() == 0 && property.getStat().equals(stat)) {
- currentStatesMap.put(key, property);
- } else {
- // need update from zk
- reloadKeys.add(key);
- }
- } else {
- LOG.debug("stat is null for key: " + key);
- reloadKeys.add(key);
- }
- }
-
- List<CurrentState> currentStates = accessor.getProperty(reloadKeys);
- Iterator<PropertyKey> csKeyIter = reloadKeys.iterator();
- for (CurrentState currentState : currentStates) {
- PropertyKey key = csKeyIter.next();
- if (currentState != null) {
- currentStatesMap.put(key, currentState);
- } else {
- LOG.debug("CurrentState null for key: " + key);
- }
- }
-
- _currentStateCache = Collections.unmodifiableMap(currentStatesMap);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("# of CurrentState paths read from ZooKeeper " + reloadKeys.size());
- LOG.debug(
- "# of CurrentState paths skipped reading from ZK: " + (currentStateKeys.size() - reloadKeys.size()));
- }
- LOG.info(
- "Takes " + (System.currentTimeMillis() - start) + " ms to reload new current states!");
- }
-
private void updateOfflineInstanceHistory(HelixDataAccessor accessor) {
List<String> offlineNodes = new ArrayList<>(_instanceConfigMap.keySet());
offlineNodes.removeAll(_liveInstanceMap.keySet());
@@ -502,7 +287,7 @@ public class ClusterDataCache {
}
public synchronized void setIdealStates(List<IdealState> idealStates) {
- Map<String, IdealState> idealStateMap = Maps.newHashMap();
+ Map<String, IdealState> idealStateMap = new HashMap();
for (IdealState idealState : idealStates) {
idealStateMap.put(idealState.getId(), idealState);
}
@@ -588,7 +373,7 @@ public class ClusterDataCache {
public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
- Map<String, LiveInstance> liveInstanceMap = Maps.newHashMap();
+ Map<String, LiveInstance> liveInstanceMap = new HashMap();
for (LiveInstance liveInstance : liveInstances) {
liveInstanceMap.put(liveInstance.getId(), liveInstance);
}
@@ -597,18 +382,16 @@ public class ClusterDataCache {
}
/**
- * Provides the current state of the node for a given session id,
- * the sessionid can be got from LiveInstance
+ * Provides the current state of the node for a given session id, the sessionid can be got from
+ * LiveInstance
+ *
* @param instanceName
* @param clientSessionId
+ *
* @return
*/
public Map<String, CurrentState> getCurrentState(String instanceName, String clientSessionId) {
- if (!_currentStateMap.containsKey(instanceName)
- || !_currentStateMap.get(instanceName).containsKey(clientSessionId)) {
- return Collections.emptyMap();
- }
- return _currentStateMap.get(instanceName).get(clientSessionId);
+ return _currentStateCache.getCurrentState(instanceName, clientSessionId);
}
/**
@@ -617,26 +400,11 @@ public class ClusterDataCache {
* @return
*/
public Map<String, Message> getMessages(String instanceName) {
- Map<String, Message> map = _messageMap.get(instanceName);
- if (map != null) {
- return map;
- } else {
- return Collections.emptyMap();
- }
+ return _instanceMessagesCache.getMessages(instanceName);
}
public void cacheMessages(List<Message> messages) {
- for (Message message : messages) {
- String instanceName = message.getTgtName();
- Map<String, Message> instMsgMap;
- if (_messageCache.containsKey(instanceName)) {
- instMsgMap = _messageCache.get(instanceName);
- } else {
- instMsgMap = Maps.newHashMap();
- _messageCache.put(instanceName, instMsgMap);
- }
- instMsgMap.put(message.getId(), message);
- }
+ _instanceMessagesCache.cacheMessages(messages);
}
/**
@@ -721,7 +489,7 @@ public class ClusterDataCache {
* @return
*/
public Map<String, JobConfig> getJobConfigMap() {
- return _jobConfigMap;
+ return _taskDataCache.getJobConfigMap();
}
/**
@@ -730,7 +498,7 @@ public class ClusterDataCache {
* @return
*/
public JobConfig getJobConfig(String resource) {
- return _jobConfigMap.get(resource);
+ return _taskDataCache.getJobConfig(resource);
}
/**
@@ -738,7 +506,7 @@ public class ClusterDataCache {
* @return
*/
public Map<String, WorkflowConfig> getWorkflowConfigMap() {
- return _workflowConfigMap;
+ return _taskDataCache.getWorkflowConfigMap();
}
/**
@@ -747,12 +515,12 @@ public class ClusterDataCache {
* @return
*/
public WorkflowConfig getWorkflowConfig(String resource) {
- return _workflowConfigMap.get(resource);
+ return _taskDataCache.getWorkflowConfig(resource);
}
public synchronized void setInstanceConfigs(List<InstanceConfig> instanceConfigs) {
- Map<String, InstanceConfig> instanceConfigMap = Maps.newHashMap();
+ Map<String, InstanceConfig> instanceConfigMap = new HashMap();
for (InstanceConfig instanceConfig : instanceConfigs) {
instanceConfigMap.put(instanceConfig.getId(), instanceConfig);
}
@@ -881,10 +649,7 @@ public class ClusterDataCache {
* @return
*/
public JobContext getJobContext(String resourceName) {
- if (_contextMap.containsKey(resourceName) && _contextMap.get(resourceName) != null) {
- return new JobContext(_contextMap.get(resourceName));
- }
- return null;
+ return _taskDataCache.getJobContext(resourceName);
}
/**
@@ -893,10 +658,7 @@ public class ClusterDataCache {
* @return
*/
public WorkflowContext getWorkflowContext(String resourceName) {
- if (_contextMap.containsKey(resourceName) && _contextMap.get(resourceName) != null) {
- return new WorkflowContext(_contextMap.get(resourceName));
- }
- return null;
+ return _taskDataCache.getWorkflowContext(resourceName);
}
/**
@@ -904,7 +666,7 @@ public class ClusterDataCache {
*/
public void updateJobContext(String resourceName, JobContext jobContext,
HelixDataAccessor accessor) {
- updateContext(resourceName, jobContext.getRecord(), accessor);
+ _taskDataCache.updateJobContext(resourceName, jobContext, accessor);
}
/**
@@ -912,17 +674,7 @@ public class ClusterDataCache {
*/
public void updateWorkflowContext(String resourceName, WorkflowContext workflowContext,
HelixDataAccessor accessor) {
- updateContext(resourceName, workflowContext.getRecord(), accessor);
- }
-
- /**
- * Update context of the Workflow or Job
- */
- private void updateContext(String resourceName, ZNRecord record, HelixDataAccessor accessor) {
- String path = String.format("/%s/%s%s/%s/%s", _clusterName, PropertyType.PROPERTYSTORE.name(),
- TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, TaskConstants.CONTEXT_NODE);
- accessor.getBaseDataAccessor().set(path, record, AccessOption.PERSISTENT);
- _contextMap.put(resourceName, record);
+ _taskDataCache.updateWorkflowContext(resourceName, workflowContext, accessor);
}
/**
@@ -930,7 +682,7 @@ public class ClusterDataCache {
* @return
*/
public Map<String, ZNRecord> getContexts() {
- return _contextMap;
+ return _taskDataCache.getContexts();
}
public ExternalView getTargetExternalView(String resourceName) {
@@ -1066,61 +818,11 @@ public class ClusterDataCache {
sb.append("stateModelDefMap:" + _stateModelDefMap).append("\n");
sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n");
sb.append("resourceConfigMap:" + _resourceConfigMap).append("\n");
- sb.append("jobContextMap:" + _contextMap).append("\n");
- sb.append("messageMap:" + _messageMap).append("\n");
- sb.append("currentStateMap:" + _currentStateMap).append("\n");
+ sb.append("taskDataCache:" + _taskDataCache).append("\n");
+ sb.append("messageCache:" + _instanceMessagesCache).append("\n");
+ sb.append("currentStateCache:" + _currentStateCache).append("\n");
sb.append("clusterConfig:" + _clusterConfig).append("\n");
return sb.toString();
}
-
- private void refreshJobContexts(HelixDataAccessor accessor) {
- // TODO: Need an optimize for reading context only if the refresh is needed.
- long start = System.currentTimeMillis();
- _contextMap.clear();
- if (_clusterName == null) {
- return;
- }
- String path = String.format("/%s/%s%s", _clusterName, PropertyType.PROPERTYSTORE.name(),
- TaskConstants.REBALANCER_CONTEXT_ROOT);
- List<String> contextPaths = new ArrayList<>();
- List<String> childNames = accessor.getBaseDataAccessor().getChildNames(path, 0);
- if (childNames == null) {
- return;
- }
- for (String context : childNames) {
- contextPaths.add(Joiner.on("/").join(path, context, TaskConstants.CONTEXT_NODE));
- }
-
- List<ZNRecord> contexts = accessor.getBaseDataAccessor().get(contextPaths, null, 0);
- for (int i = 0; i < contexts.size(); i++) {
- ZNRecord context = contexts.get(i);
- if (context != null && context.getSimpleField(NAME) != null) {
- _contextMap.put(context.getSimpleField(NAME), context);
- } else {
- _contextMap.put(childNames.get(i), context);
- LOG.debug(
- String.format("Context for %s is null or miss the context NAME!", childNames.get((i))));
- }
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("# of workflow/job context read from zk: " + _contextMap.size() + ". Take " + (
- System.currentTimeMillis() - start) + " ms");
- }
- }
-
- private void updateWorkflowJobConfigs() {
- _workflowConfigMap.clear();
- _jobConfigMap.clear();
- for (Map.Entry<String, ResourceConfig> entry : _resourceConfigMap.entrySet()) {
- if (entry.getValue().getRecord().getSimpleFields()
- .containsKey(WorkflowConfig.WorkflowConfigProperty.Dag.name())) {
- _workflowConfigMap.put(entry.getKey(), new WorkflowConfig(entry.getValue()));
- } else if (entry.getValue().getRecord().getSimpleFields()
- .containsKey(WorkflowConfig.WorkflowConfigProperty.WorkflowID.name())) {
- _jobConfigMap.put(entry.getKey(), new JobConfig(entry.getValue()));
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/ae02b58d/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
index 5602333..332cd8a 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
@@ -21,7 +21,7 @@ package org.apache.helix.spectator;
import org.apache.helix.HelixConstants;
import org.apache.helix.PropertyType;
-import org.apache.helix.common.BasicClusterDataCache;
+import org.apache.helix.common.caches.BasicClusterDataCache;
/**
* Cache the cluster data that are needed by RoutingTableProvider.