You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/11/27 13:24:35 UTC

ambari git commit: AMBARI-22427. Check if configurations used for commands are of correct version (aonishuk)

Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-perf 54e84d403 -> 45d9ca671


AMBARI-22427. Check if configurations used for commands are of correct version (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/45d9ca67
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/45d9ca67
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/45d9ca67

Branch: refs/heads/branch-3.0-perf
Commit: 45d9ca6717c636f67ce024f1586a0f77413c17a2
Parents: 54e84d4
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Mon Nov 27 15:24:28 2017 +0200
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Mon Nov 27 15:24:28 2017 +0200

----------------------------------------------------------------------
 .../python/ambari_agent/ConfigurationBuilder.py | 13 ++++++-----
 .../ambari_agent/CustomServiceOrchestrator.py   |  4 +++-
 .../listeners/CommandsEventListener.py          |  3 +++
 .../listeners/ConfigurationEventListener.py     |  2 ++
 .../ambari_agent/TestAgentStompResponses.py     |  6 ++---
 .../stomp/configurations_update.json            |  3 ++-
 .../dummy_files/stomp/execution_commands.json   |  3 ++-
 .../agent/stomp/AgentClusterDataHolder.java     |  2 +-
 .../server/agent/stomp/AgentConfigsHolder.java  | 23 ++++++++++++++++++++
 .../server/agent/stomp/AgentDataHolder.java     |  4 ++--
 .../server/agent/stomp/AgentHostDataHolder.java | 10 ++++++---
 .../server/events/AgentConfigsUpdateEvent.java  | 15 +++++++++++++
 .../server/events/ExecutionCommandEvent.java    | 14 ++++++++++++
 .../publishers/AgentCommandsPublisher.java      |  5 +++++
 14 files changed, 90 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-agent/src/main/python/ambari_agent/ConfigurationBuilder.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ConfigurationBuilder.py b/ambari-agent/src/main/python/ambari_agent/ConfigurationBuilder.py
index f8bdb42..1aed2a1 100644
--- a/ambari-agent/src/main/python/ambari_agent/ConfigurationBuilder.py
+++ b/ambari-agent/src/main/python/ambari_agent/ConfigurationBuilder.py
@@ -27,9 +27,12 @@ class ConfigurationBuilder:
     self.topology_cache = initializer_module.topology_cache
     self.host_level_params_cache = initializer_module.host_level_params_cache
     self.configurations_cache = initializer_module.configurations_cache
-    
-  def get_configuration(self, cluster_id, service_name, component_name):
-    if cluster_id: 
+
+  def get_configuration(self, cluster_id, service_name, component_name, configurations_timestamp=None):
+    if cluster_id:
+      if configurations_timestamp and configurations_timestamp != self.configurations_cache.timestamp:
+        raise Exception("Command requires configs with timestamp={0} but configs on agent have timestamp={1}".format(configurations_timestamp, self.configurations_cache.timestamp))
+
       metadata_cache = self.metadata_cache[cluster_id]
       configurations_cache = self.configurations_cache[cluster_id]
       host_level_params_cache = self.host_level_params_cache[cluster_id]
@@ -49,7 +52,7 @@ class ConfigurationBuilder:
       if component_name in host_repos.componentRepos:
         repo_version_id = host_repos.componentRepos[component_name]
         command_dict['repositoryFile'] = host_repos.commandRepos[str(repo_version_id)]
-        
+
       component_dict = self.topology_cache.get_component_info_by_key(cluster_id, service_name, component_name)
       if component_dict is not None:
         command_dict.update({
@@ -74,7 +77,7 @@ class ConfigurationBuilder:
       }
     }
     return command_dict
-    
+
   @property
   def public_fqdn(self):
     hostname.public_hostname(self.config)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 0debb1b..5c0561a 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -461,7 +461,9 @@ class CustomServiceOrchestrator():
       service_name = None
       component_name = None
 
-    command_dict = self.configuration_builder.get_configuration(cluster_id, service_name, component_name)
+    required_config_timestamp = command_header['requiredConfigTimestamp'] if 'requiredConfigTimestamp' in command_header else None
+
+    command_dict = self.configuration_builder.get_configuration(cluster_id, service_name, component_name, required_config_timestamp)
     command = Utils.update_nested(Utils.get_mutable_copy(command_dict), command_header)
     return command
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
index ae8d400..dc3bea9 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
@@ -51,6 +51,9 @@ class CommandsEventListener(EventListener):
       if 'cancelCommands' in cluster_dict:
         cancel_commands += cluster_dict['cancelCommands']
 
+    for command in commands:
+      command['requiredConfigTimestamp'] = message['requiredConfigTimestamp']
+
     with self.action_queue.lock:
       self.action_queue.cancel(cancel_commands)
       self.action_queue.put(commands)

http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
index a4884d8..1c083a6 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
@@ -40,6 +40,8 @@ class ConfigurationEventListener(EventListener):
     @param headers: headers dictionary
     @param message: message payload dictionary
     """
+    self.configuration_cache.timestamp = message.pop('timestamp')
+
     # this kind of response is received if hash was identical. And server does not need to change anything
     if message == {}:
       return

http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index 162a809..39542d1 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -191,7 +191,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body='{}')
     self.server.topic_manager.send(f)
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{}')
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{"timestamp":1510577217}')
     self.server.topic_manager.send(f)
 
     f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body='{}')
@@ -242,7 +242,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body='{}')
     self.server.topic_manager.send(f)
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{}')
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{"timestamp":1510577217}')
     self.server.topic_manager.send(f)
 
     f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body='{}')
@@ -321,7 +321,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body='{}')
     self.server.topic_manager.send(f)
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{}')
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{"timestamp":1510577217}')
     self.server.topic_manager.send(f)
 
     f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body='{}')

http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json
index e8e1ab5..48e9e26 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json
@@ -1,5 +1,6 @@
 {
   "hash": "a1a71f4b46feaf72bf33627d78bbdc3e",
+  "timestamp": 1510577217,
   "clusters": {
     "0":{
       "configurations":{
@@ -49,4 +50,4 @@
       }
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
index 76dac1b..ebc2c36 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
@@ -1,4 +1,5 @@
 {
+  "requiredConfigTimestamp": 1510577217,
   "clusters": {
     "0":{
       "commands":[
@@ -77,4 +78,4 @@
       ]
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java
index f966386..0b90b6d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java
@@ -83,7 +83,7 @@ public abstract class AgentClusterDataHolder<T extends AmbariUpdateEvent & Hasha
   protected final void regenerateHash() {
     try {
       lock.lock();
-      regenerateHash(data);
+      regenerateDataIdentifiers(data);
     } finally {
       lock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java
index 50779ff..93534ac 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java
@@ -67,6 +67,29 @@ public class AgentConfigsHolder extends AgentHostDataHolder<AgentConfigsUpdateEv
   }
 
   @Override
+  public AgentConfigsUpdateEvent getUpdateIfChanged(String agentHash, Long hostId) throws AmbariException {
+    AgentConfigsUpdateEvent update = super.getUpdateIfChanged(agentHash, hostId);
+    if (update.getClustersConfigs() == null) {
+      update.setTimestamp(getData(hostId).getTimestamp());
+    }
+    return update;
+  }
+
+  @Override
+  protected void regenerateDataIdentifiers(AgentConfigsUpdateEvent data) {
+    data.setHash(null);
+    data.setTimestamp(null);
+    data.setHash(getHash(data));
+    data.setTimestamp(System.currentTimeMillis());
+  }
+
+  @Override
+  protected void setIdentifiersToEventUpdate(AgentConfigsUpdateEvent update, AgentConfigsUpdateEvent hostData) {
+    super.setIdentifiersToEventUpdate(update, hostData);
+    update.setTimestamp(hostData.getTimestamp());
+  }
+
+  @Override
   protected AgentConfigsUpdateEvent getEmptyData() {
     return AgentConfigsUpdateEvent.emptyUpdate();
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentDataHolder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentDataHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentDataHolder.java
index a8b0a32..3896a68 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentDataHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentDataHolder.java
@@ -35,12 +35,12 @@ public abstract class AgentDataHolder<T extends Hashable> {
 
   protected abstract T getEmptyData();
 
-  final void regenerateHash(T data) {
+  protected void regenerateDataIdentifiers(T data) {
     data.setHash(null);
     data.setHash(getHash(data));
   }
 
-  private String getHash(T data) {
+  protected String getHash(T data) {
     String json = new Gson().toJson(data);
     String generatedPassword = null;
     try {

http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java
index 7c540f9..7f1c1a9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java
@@ -53,7 +53,7 @@ public abstract class AgentHostDataHolder<T extends AmbariHostUpdateEvent & Hash
     if (hostData == null) {
       hostData = getCurrentData(hostId);
       if (regenerateHash) {
-        regenerateHash(hostData);
+        regenerateDataIdentifiers(hostData);
       }
       data.put(hostId, hostData);
     }
@@ -68,12 +68,16 @@ public abstract class AgentHostDataHolder<T extends AmbariHostUpdateEvent & Hash
     initializeDataIfNeeded(update.getHostId(), false);
     if (handleUpdate(update)) {
       T hostData = getData(update.getHostId());
-      regenerateHash(hostData);
-      update.setHash(hostData.getHash());
+      regenerateDataIdentifiers(hostData);
+      setIdentifiersToEventUpdate(update, hostData);
       stateUpdateEventPublisher.publish(update);
     }
   }
 
+  protected void setIdentifiersToEventUpdate(T update, T hostData) {
+    update.setHash(hostData.getHash());
+  }
+
   /**
    * Reset data for the given host.  Used if changes are complex and it's easier to re-create data from scratch.
    */

http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-server/src/main/java/org/apache/ambari/server/events/AgentConfigsUpdateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AgentConfigsUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AgentConfigsUpdateEvent.java
index 89fc8bc..277caf6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/AgentConfigsUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AgentConfigsUpdateEvent.java
@@ -39,6 +39,8 @@ public class AgentConfigsUpdateEvent extends AmbariHostUpdateEvent implements Ha
    */
   private String hash;
 
+  private Long timestamp;
+
   /**
    * Host identifier.
    */
@@ -53,6 +55,7 @@ public class AgentConfigsUpdateEvent extends AmbariHostUpdateEvent implements Ha
   public AgentConfigsUpdateEvent(SortedMap<String, ClusterConfigs> clustersConfigs) {
     super(Type.AGENT_CONFIGS);
     this.clustersConfigs = clustersConfigs;
+    this.timestamp = System.currentTimeMillis();
   }
 
   @Override
@@ -64,6 +67,14 @@ public class AgentConfigsUpdateEvent extends AmbariHostUpdateEvent implements Ha
     this.hash = hash;
   }
 
+  public Long getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(Long timestamp) {
+    this.timestamp = timestamp;
+  }
+
   public void setHostId(Long hostId) {
     this.hostId = hostId;
   }
@@ -73,6 +84,10 @@ public class AgentConfigsUpdateEvent extends AmbariHostUpdateEvent implements Ha
     return hostId;
   }
 
+  public SortedMap<String, ClusterConfigs> getClustersConfigs() {
+    return clustersConfigs;
+  }
+
   public static AgentConfigsUpdateEvent emptyUpdate() {
     return new AgentConfigsUpdateEvent(null);
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-server/src/main/java/org/apache/ambari/server/events/ExecutionCommandEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/ExecutionCommandEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/ExecutionCommandEvent.java
index c97ed60..632323e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/ExecutionCommandEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/ExecutionCommandEvent.java
@@ -36,6 +36,12 @@ public class ExecutionCommandEvent extends AmbariHostUpdateEvent {
   private Long hostId;
 
   /**
+   *
+   */
+  @JsonProperty("requiredConfigTimestamp")
+  private Long requiredConfigTimestamp;
+
+  /**
    * Execution commands grouped by cluster id.
    */
   @JsonProperty("clusters")
@@ -80,4 +86,12 @@ public class ExecutionCommandEvent extends AmbariHostUpdateEvent {
   public Long getHostId() {
     return hostId;
   }
+
+  public Long getRequiredConfigTimestamp() {
+    return requiredConfigTimestamp;
+  }
+
+  public void setRequiredConfigTimestamp(Long requiredConfigTimestamp) {
+    this.requiredConfigTimestamp = requiredConfigTimestamp;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/45d9ca67/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java
index bacdc2d..2698bc3 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java
@@ -32,6 +32,7 @@ import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.AgentCommand;
 import org.apache.ambari.server.agent.CancelCommand;
 import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.agent.stomp.AgentConfigsHolder;
 import org.apache.ambari.server.agent.stomp.dto.ExecutionCommandsCluster;
 import org.apache.ambari.server.events.ExecutionCommandEvent;
 import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
@@ -70,6 +71,9 @@ public class AgentCommandsPublisher {
   @Inject
   private StateUpdateEventPublisher stateUpdateEventPublisher;
 
+  @Inject
+  private AgentConfigsHolder agentConfigsHolder;
+
   public void sendAgentCommand(Multimap<Long, AgentCommand> agentCommands) throws AmbariException {
     if (agentCommands != null && !agentCommands.isEmpty()) {
       Map<Long, TreeMap<String, ExecutionCommandsCluster>> executionCommandsClusters = new TreeMap<>();
@@ -82,6 +86,7 @@ public class AgentCommandsPublisher {
         Long hostId = hostEntry.getKey();
         ExecutionCommandEvent executionCommandEvent = new ExecutionCommandEvent(hostEntry.getValue());
         executionCommandEvent.setHostId(hostId);
+        executionCommandEvent.setRequiredConfigTimestamp(agentConfigsHolder.getCurrentData(hostId).getTimestamp());
         stateUpdateEventPublisher.publish(executionCommandEvent);
       }
     }