You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/11/27 13:24:35 UTC
ambari git commit: AMBARI-22427. Check if configurations used for
commands are of correct version (aonishuk)
Repository: ambari
Updated Branches:
refs/heads/branch-3.0-perf 54e84d403 -> 45d9ca671
AMBARI-22427. Check if configurations used for commands are of correct version (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/45d9ca67
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/45d9ca67
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/45d9ca67
Branch: refs/heads/branch-3.0-perf
Commit: 45d9ca6717c636f67ce024f1586a0f77413c17a2
Parents: 54e84d4
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Mon Nov 27 15:24:28 2017 +0200
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Mon Nov 27 15:24:28 2017 +0200
----------------------------------------------------------------------
.../python/ambari_agent/ConfigurationBuilder.py | 13 ++++++-----
.../ambari_agent/CustomServiceOrchestrator.py | 4 +++-
.../listeners/CommandsEventListener.py | 3 +++
.../listeners/ConfigurationEventListener.py | 2 ++
.../ambari_agent/TestAgentStompResponses.py | 6 ++---
.../stomp/configurations_update.json | 3 ++-
.../dummy_files/stomp/execution_commands.json | 3 ++-
.../agent/stomp/AgentClusterDataHolder.java | 2 +-
.../server/agent/stomp/AgentConfigsHolder.java | 23 ++++++++++++++++++++
.../server/agent/stomp/AgentDataHolder.java | 4 ++--
.../server/agent/stomp/AgentHostDataHolder.java | 10 ++++++---
.../server/events/AgentConfigsUpdateEvent.java | 15 +++++++++++++
.../server/events/ExecutionCommandEvent.java | 14 ++++++++++++
.../publishers/AgentCommandsPublisher.java | 5 +++++
14 files changed, 90 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-agent/src/main/python/ambari_agent/ConfigurationBuilder.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ConfigurationBuilder.py b/ambari-agent/src/main/python/ambari_agent/ConfigurationBuilder.py
index f8bdb42..1aed2a1 100644
--- a/ambari-agent/src/main/python/ambari_agent/ConfigurationBuilder.py
+++ b/ambari-agent/src/main/python/ambari_agent/ConfigurationBuilder.py
@@ -27,9 +27,12 @@ class ConfigurationBuilder:
self.topology_cache = initializer_module.topology_cache
self.host_level_params_cache = initializer_module.host_level_params_cache
self.configurations_cache = initializer_module.configurations_cache
-
- def get_configuration(self, cluster_id, service_name, component_name):
- if cluster_id:
+
+ def get_configuration(self, cluster_id, service_name, component_name, configurations_timestamp=None):
+ if cluster_id:
+ if configurations_timestamp and configurations_timestamp != self.configurations_cache.timestamp:
+ raise Exception("Command requires configs with timestamp={0} but configs on agent have timestamp={1}".format(configurations_timestamp, self.configurations_cache.timestamp))
+
metadata_cache = self.metadata_cache[cluster_id]
configurations_cache = self.configurations_cache[cluster_id]
host_level_params_cache = self.host_level_params_cache[cluster_id]
@@ -49,7 +52,7 @@ class ConfigurationBuilder:
if component_name in host_repos.componentRepos:
repo_version_id = host_repos.componentRepos[component_name]
command_dict['repositoryFile'] = host_repos.commandRepos[str(repo_version_id)]
-
+
component_dict = self.topology_cache.get_component_info_by_key(cluster_id, service_name, component_name)
if component_dict is not None:
command_dict.update({
@@ -74,7 +77,7 @@ class ConfigurationBuilder:
}
}
return command_dict
-
+
@property
def public_fqdn(self):
hostname.public_hostname(self.config)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 0debb1b..5c0561a 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -461,7 +461,9 @@ class CustomServiceOrchestrator():
service_name = None
component_name = None
- command_dict = self.configuration_builder.get_configuration(cluster_id, service_name, component_name)
+ required_config_timestamp = command_header['requiredConfigTimestamp'] if 'requiredConfigTimestamp' in command_header else None
+
+ command_dict = self.configuration_builder.get_configuration(cluster_id, service_name, component_name, required_config_timestamp)
command = Utils.update_nested(Utils.get_mutable_copy(command_dict), command_header)
return command
http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
index ae8d400..dc3bea9 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
@@ -51,6 +51,9 @@ class CommandsEventListener(EventListener):
if 'cancelCommands' in cluster_dict:
cancel_commands += cluster_dict['cancelCommands']
+ for command in commands:
+ command['requiredConfigTimestamp'] = message['requiredConfigTimestamp']
+
with self.action_queue.lock:
self.action_queue.cancel(cancel_commands)
self.action_queue.put(commands)
http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
index a4884d8..1c083a6 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
@@ -40,6 +40,8 @@ class ConfigurationEventListener(EventListener):
@param headers: headers dictionary
@param message: message payload dictionary
"""
+ self.configuration_cache.timestamp = message.pop('timestamp')
+
# this kind of response is received if hash was identical. And server does not need to change anything
if message == {}:
return
http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index 162a809..39542d1 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -191,7 +191,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body='{}')
self.server.topic_manager.send(f)
- f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{}')
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{"timestamp":1510577217}')
self.server.topic_manager.send(f)
f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body='{}')
@@ -242,7 +242,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body='{}')
self.server.topic_manager.send(f)
- f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{}')
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{"timestamp":1510577217}')
self.server.topic_manager.send(f)
f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body='{}')
@@ -321,7 +321,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body='{}')
self.server.topic_manager.send(f)
- f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{}')
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{"timestamp":1510577217}')
self.server.topic_manager.send(f)
f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body='{}')
http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json
index e8e1ab5..48e9e26 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json
@@ -1,5 +1,6 @@
{
"hash": "a1a71f4b46feaf72bf33627d78bbdc3e",
+ "timestamp": 1510577217,
"clusters": {
"0":{
"configurations":{
@@ -49,4 +50,4 @@
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
index 76dac1b..ebc2c36 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
@@ -1,4 +1,5 @@
{
+ "requiredConfigTimestamp": 1510577217,
"clusters": {
"0":{
"commands":[
@@ -77,4 +78,4 @@
]
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java
index f966386..0b90b6d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java
@@ -83,7 +83,7 @@ public abstract class AgentClusterDataHolder<T extends AmbariUpdateEvent & Hasha
protected final void regenerateHash() {
try {
lock.lock();
- regenerateHash(data);
+ regenerateDataIdentifiers(data);
} finally {
lock.unlock();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java
index 50779ff..93534ac 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java
@@ -67,6 +67,29 @@ public class AgentConfigsHolder extends AgentHostDataHolder<AgentConfigsUpdateEv
}
@Override
+ public AgentConfigsUpdateEvent getUpdateIfChanged(String agentHash, Long hostId) throws AmbariException {
+ AgentConfigsUpdateEvent update = super.getUpdateIfChanged(agentHash, hostId);
+ if (update.getClustersConfigs() == null) {
+ update.setTimestamp(getData(hostId).getTimestamp());
+ }
+ return update;
+ }
+
+ @Override
+ protected void regenerateDataIdentifiers(AgentConfigsUpdateEvent data) {
+ data.setHash(null);
+ data.setTimestamp(null);
+ data.setHash(getHash(data));
+ data.setTimestamp(System.currentTimeMillis());
+ }
+
+ @Override
+ protected void setIdentifiersToEventUpdate(AgentConfigsUpdateEvent update, AgentConfigsUpdateEvent hostData) {
+ super.setIdentifiersToEventUpdate(update, hostData);
+ update.setTimestamp(hostData.getTimestamp());
+ }
+
+ @Override
protected AgentConfigsUpdateEvent getEmptyData() {
return AgentConfigsUpdateEvent.emptyUpdate();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentDataHolder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentDataHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentDataHolder.java
index a8b0a32..3896a68 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentDataHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentDataHolder.java
@@ -35,12 +35,12 @@ public abstract class AgentDataHolder<T extends Hashable> {
protected abstract T getEmptyData();
- final void regenerateHash(T data) {
+ protected void regenerateDataIdentifiers(T data) {
data.setHash(null);
data.setHash(getHash(data));
}
- private String getHash(T data) {
+ protected String getHash(T data) {
String json = new Gson().toJson(data);
String generatedPassword = null;
try {
http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java
index 7c540f9..7f1c1a9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java
@@ -53,7 +53,7 @@ public abstract class AgentHostDataHolder<T extends AmbariHostUpdateEvent & Hash
if (hostData == null) {
hostData = getCurrentData(hostId);
if (regenerateHash) {
- regenerateHash(hostData);
+ regenerateDataIdentifiers(hostData);
}
data.put(hostId, hostData);
}
@@ -68,12 +68,16 @@ public abstract class AgentHostDataHolder<T extends AmbariHostUpdateEvent & Hash
initializeDataIfNeeded(update.getHostId(), false);
if (handleUpdate(update)) {
T hostData = getData(update.getHostId());
- regenerateHash(hostData);
- update.setHash(hostData.getHash());
+ regenerateDataIdentifiers(hostData);
+ setIdentifiersToEventUpdate(update, hostData);
stateUpdateEventPublisher.publish(update);
}
}
+ protected void setIdentifiersToEventUpdate(T update, T hostData) {
+ update.setHash(hostData.getHash());
+ }
+
/**
* Reset data for the given host. Used if changes are complex and it's easier to re-create data from scratch.
*/
http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-server/src/main/java/org/apache/ambari/server/events/AgentConfigsUpdateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AgentConfigsUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AgentConfigsUpdateEvent.java
index 89fc8bc..277caf6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/AgentConfigsUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AgentConfigsUpdateEvent.java
@@ -39,6 +39,8 @@ public class AgentConfigsUpdateEvent extends AmbariHostUpdateEvent implements Ha
*/
private String hash;
+ private Long timestamp;
+
/**
* Host identifier.
*/
@@ -53,6 +55,7 @@ public class AgentConfigsUpdateEvent extends AmbariHostUpdateEvent implements Ha
public AgentConfigsUpdateEvent(SortedMap<String, ClusterConfigs> clustersConfigs) {
super(Type.AGENT_CONFIGS);
this.clustersConfigs = clustersConfigs;
+ this.timestamp = System.currentTimeMillis();
}
@Override
@@ -64,6 +67,14 @@ public class AgentConfigsUpdateEvent extends AmbariHostUpdateEvent implements Ha
this.hash = hash;
}
+ public Long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(Long timestamp) {
+ this.timestamp = timestamp;
+ }
+
public void setHostId(Long hostId) {
this.hostId = hostId;
}
@@ -73,6 +84,10 @@ public class AgentConfigsUpdateEvent extends AmbariHostUpdateEvent implements Ha
return hostId;
}
+ public SortedMap<String, ClusterConfigs> getClustersConfigs() {
+ return clustersConfigs;
+ }
+
public static AgentConfigsUpdateEvent emptyUpdate() {
return new AgentConfigsUpdateEvent(null);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-server/src/main/java/org/apache/ambari/server/events/ExecutionCommandEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/ExecutionCommandEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/ExecutionCommandEvent.java
index c97ed60..632323e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/ExecutionCommandEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/ExecutionCommandEvent.java
@@ -36,6 +36,12 @@ public class ExecutionCommandEvent extends AmbariHostUpdateEvent {
private Long hostId;
/**
+ *
+ */
+ @JsonProperty("requiredConfigTimestamp")
+ private Long requiredConfigTimestamp;
+
+ /**
* Execution commands grouped by cluster id.
*/
@JsonProperty("clusters")
@@ -80,4 +86,12 @@ public class ExecutionCommandEvent extends AmbariHostUpdateEvent {
public Long getHostId() {
return hostId;
}
+
+ public Long getRequiredConfigTimestamp() {
+ return requiredConfigTimestamp;
+ }
+
+ public void setRequiredConfigTimestamp(Long requiredConfigTimestamp) {
+ this.requiredConfigTimestamp = requiredConfigTimestamp;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java
index bacdc2d..2698bc3 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java
@@ -32,6 +32,7 @@ import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.agent.AgentCommand;
import org.apache.ambari.server.agent.CancelCommand;
import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.agent.stomp.AgentConfigsHolder;
import org.apache.ambari.server.agent.stomp.dto.ExecutionCommandsCluster;
import org.apache.ambari.server.events.ExecutionCommandEvent;
import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
@@ -70,6 +71,9 @@ public class AgentCommandsPublisher {
@Inject
private StateUpdateEventPublisher stateUpdateEventPublisher;
+ @Inject
+ private AgentConfigsHolder agentConfigsHolder;
+
public void sendAgentCommand(Multimap<Long, AgentCommand> agentCommands) throws AmbariException {
if (agentCommands != null && !agentCommands.isEmpty()) {
Map<Long, TreeMap<String, ExecutionCommandsCluster>> executionCommandsClusters = new TreeMap<>();
@@ -82,6 +86,7 @@ public class AgentCommandsPublisher {
Long hostId = hostEntry.getKey();
ExecutionCommandEvent executionCommandEvent = new ExecutionCommandEvent(hostEntry.getValue());
executionCommandEvent.setHostId(hostId);
+ executionCommandEvent.setRequiredConfigTimestamp(agentConfigsHolder.getCurrentData(hostId).getTimestamp());
stateUpdateEventPublisher.publish(executionCommandEvent);
}
}