You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2017/04/19 15:05:35 UTC
[4/6] ambari git commit: AMBARI-20777 : AMS changes to use instanceId
for cluster based segregation of data. (avijayan)
AMBARI-20777 : AMS changes to use instanceId for cluster based segregation of data. (avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/323da508
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/323da508
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/323da508
Branch: refs/heads/branch-feature-AMBARI-12556
Commit: 323da508944626c8ee82118ce67a000b8e5a511c
Parents: 8e2abf9
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Tue Apr 18 13:16:09 2017 -0700
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Tue Apr 18 13:16:09 2017 -0700
----------------------------------------------------------------------
.../timeline/HadoopTimelineMetricsSink.java | 4 +
.../timeline/HBaseTimelineMetricStore.java | 5 +
.../metrics/timeline/PhoenixHBaseAccessor.java | 106 +++++++++++++++++++
.../metrics/timeline/TimelineMetricStore.java | 8 ++
.../TimelineMetricClusterAggregatorSecond.java | 1 +
.../TimelineMetricMetadataManager.java | 38 +++++++
.../discovery/TimelineMetricMetadataSync.java | 57 ++++++++++
.../timeline/query/PhoenixTransactSQL.java | 12 +++
.../webapp/TimelineWebServices.java | 16 +++
.../timeline/TestTimelineMetricStore.java | 5 +
.../timeline/discovery/TestMetadataManager.java | 8 ++
.../timeline/discovery/TestMetadataSync.java | 12 +++
12 files changed, 272 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/323da508/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index a112ef2..8e0de03 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -51,6 +51,7 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
private Map<String, Set<String>> useTagsMap = new HashMap<String, Set<String>>();
private TimelineMetricsCache metricsCache;
private String hostName = "UNKNOWN.example.com";
+ private String instanceId = null;
private String serviceName = "";
private Collection<String> collectorHosts;
private String collectorUri;
@@ -94,6 +95,8 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
}
serviceName = getServiceName(conf);
+ String inst = conf.getString("instanceId", "");
+ instanceId = StringUtils.isEmpty(inst) ? null : inst;
LOG.info("Identified hostname = " + hostName + ", serviceName = " + serviceName);
// Initialize the collector write strategy
@@ -318,6 +321,7 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
timelineMetric.setMetricName(name);
timelineMetric.setHostName(hostName);
timelineMetric.setAppId(serviceName);
+ timelineMetric.setInstanceId(instanceId);
timelineMetric.setStartTime(startTime);
timelineMetric.setType(metric.type() != null ? metric.type().name() : null);
timelineMetric.getMetricValues().put(startTime, value.doubleValue());
http://git-wip-us.apache.org/repos/asf/ambari/blob/323da508/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index 17c58f0..fa095a0 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -388,6 +388,11 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
}
@Override
+ public Map<String, Set<String>> getInstanceHostsMetadata() throws SQLException, IOException {
+ return metricMetadataManager.getHostedInstanceCache();
+ }
+
+ @Override
public List<String> getLiveInstances() {
List<String> instances = null;
http://git-wip-us.apache.org/repos/asf/ambari/blob/323da508/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 8b0d84b..65bbc4c 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -115,6 +115,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_CONTAINER_METRICS_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_INSTANCE_HOST_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
@@ -124,6 +125,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_HOSTED_APPS_METADATA_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_INSTANCE_HOST_METADATA_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_METADATA_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
@@ -138,6 +140,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_HOSTED_APPS_METADATA_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_INSTANCE_HOST_METADATA_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METADATA_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL;
@@ -430,6 +433,11 @@ public class PhoenixHBaseAccessor {
encoding, compression);
stmt.executeUpdate(hostedAppSql);
+ //Host Instances table
+ String hostedInstancesSql = String.format(CREATE_INSTANCE_HOST_TABLE_SQL,
+ encoding, compression);
+ stmt.executeUpdate(hostedInstancesSql);
+
// Container Metrics
stmt.executeUpdate( String.format(CREATE_CONTAINER_METRICS_TABLE_SQL,
encoding, tableTTL.get(CONTAINER_METRICS_TABLE_NAME), compression));
@@ -778,6 +786,8 @@ public class PhoenixHBaseAccessor {
metadataManager.putIfModifiedHostedAppsMetadata(
tm.getHostName(), tm.getAppId());
+
+ metadataManager.putIfModifiedHostedInstanceMetadata(tm.getInstanceId(), tm.getHostName());
}
if (!acceptMetric) {
iterator.remove();
@@ -1552,6 +1562,55 @@ public class PhoenixHBaseAccessor {
}
}
+ public void saveInstanceHostsMetadata(Map<String, Set<String>> instanceHostsMap) throws SQLException {
+ Connection conn = getConnection();
+ PreparedStatement stmt = null;
+ try {
+ stmt = conn.prepareStatement(UPSERT_INSTANCE_HOST_METADATA_SQL);
+ int rowCount = 0;
+
+ for (Map.Entry<String, Set<String>> hostInstancesEntry : instanceHostsMap.entrySet()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Host Instances Entry: " + hostInstancesEntry);
+ }
+
+ String instanceId = hostInstancesEntry.getKey();
+
+ for(String hostname : hostInstancesEntry.getValue()) {
+ stmt.clearParameters();
+ stmt.setString(1, instanceId);
+ stmt.setString(2, hostname);
+ try {
+ stmt.executeUpdate();
+ rowCount++;
+ } catch (SQLException sql) {
+ LOG.error("Error saving host instances metadata.", sql);
+ }
+ }
+
+ }
+
+ conn.commit();
+ LOG.info("Saved " + rowCount + " host instances metadata records.");
+
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
+ }
+ }
+ }
+
/**
* Save metdata on updates.
* @param metricMetadata @Collection<@TimelineMetricMetadata>
@@ -1658,6 +1717,53 @@ public class PhoenixHBaseAccessor {
return hostedAppMap;
}
+ public Map<String, Set<String>> getInstanceHostsMetdata() throws SQLException {
+ Map<String, Set<String>> instanceHostsMap = new HashMap<>();
+ Connection conn = getConnection();
+ PreparedStatement stmt = null;
+ ResultSet rs = null;
+
+ try {
+ stmt = conn.prepareStatement(GET_INSTANCE_HOST_METADATA_SQL);
+ rs = stmt.executeQuery();
+
+ while (rs.next()) {
+ String instanceId = rs.getString("INSTANCE_ID");
+ String hostname = rs.getString("HOSTNAME");
+
+ if (!instanceHostsMap.containsKey(instanceId)) {
+ instanceHostsMap.put(instanceId, new HashSet<String>());
+ }
+ instanceHostsMap.get(instanceId).add(hostname);
+ }
+
+ } finally {
+ if (rs != null) {
+ try {
+ rs.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
+ }
+ }
+
+ return instanceHostsMap;
+ }
+
// No filter criteria support for now.
public Map<TimelineMetricMetadataKey, TimelineMetricMetadata> getTimelineMetricMetadata() throws SQLException {
Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadataMap = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/ambari/blob/323da508/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
index d049e33..121a8ae 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
@@ -89,6 +89,14 @@ public interface TimelineMetricStore {
Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException;
/**
+ * Returns all instances and the set of hosts each instance is present on
+ * @return { instanceId : [ hosts ] }
+ * @throws SQLException
+ * @throws IOException
+ */
+ Map<String, Set<String>> getInstanceHostsMetadata() throws SQLException, IOException;
+
+ /**
* Return a list of known live collector nodes
* @return [ hostname ]
*/
http://git-wip-us.apache.org/repos/asf/ambari/blob/323da508/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
index 5310906..a5a3499 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -129,6 +129,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
condition.addOrderByColumn("METRIC_NAME");
condition.addOrderByColumn("HOSTNAME");
condition.addOrderByColumn("APP_ID");
+ condition.addOrderByColumn("INSTANCE_ID");
condition.addOrderByColumn("SERVER_TIME");
return condition;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/323da508/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
index 7eb2457..f904ebe 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
@@ -54,8 +54,10 @@ public class TimelineMetricMetadataManager {
private final Map<TimelineMetricMetadataKey, TimelineMetricMetadata> METADATA_CACHE = new ConcurrentHashMap<>();
// Map to lookup apps on a host
private final Map<String, Set<String>> HOSTED_APPS_MAP = new ConcurrentHashMap<>();
+ private final Map<String, Set<String>> INSTANCE_HOST_MAP = new ConcurrentHashMap<>();
// Sync only when needed
AtomicBoolean SYNC_HOSTED_APPS_METADATA = new AtomicBoolean(false);
+ AtomicBoolean SYNC_HOSTED_INSTANCES_METADATA = new AtomicBoolean(false);
// Single thread to sync back new writes to the store
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
@@ -122,14 +124,25 @@ public class TimelineMetricMetadataManager {
return HOSTED_APPS_MAP;
}
+ public Map<String, Set<String>> getHostedInstanceCache() {
+ return INSTANCE_HOST_MAP;
+ }
+
public boolean syncHostedAppsMetadata() {
return SYNC_HOSTED_APPS_METADATA.get();
}
+ public boolean syncHostedInstanceMetadata() {
+ return SYNC_HOSTED_INSTANCES_METADATA.get();
+ }
+
public void markSuccessOnSyncHostedAppsMetadata() {
SYNC_HOSTED_APPS_METADATA.set(false);
}
+ public void markSuccessOnSyncHostedInstanceMetadata() {
+ SYNC_HOSTED_INSTANCES_METADATA.set(false);
+ }
/**
* Test metric name for valid patterns and return true/false
*/
@@ -189,6 +202,23 @@ public class TimelineMetricMetadataManager {
}
}
+ public void putIfModifiedHostedInstanceMetadata(String instanceId, String hostname) {
+ if (StringUtils.isEmpty(instanceId)) {
+ return;
+ }
+
+ Set<String> hosts = INSTANCE_HOST_MAP.get(instanceId);
+ if (hosts == null) {
+ hosts = new HashSet<>();
+ INSTANCE_HOST_MAP.put(instanceId, hosts);
+ }
+
+ if (!hosts.contains(hostname)) {
+ hosts.add(hostname);
+ SYNC_HOSTED_INSTANCES_METADATA.set(true);
+ }
+ }
+
public void persistMetadata(Collection<TimelineMetricMetadata> metadata) throws SQLException {
hBaseAccessor.saveMetricMetadata(metadata);
}
@@ -197,6 +227,10 @@ public class TimelineMetricMetadataManager {
hBaseAccessor.saveHostAppsMetadata(hostedApps);
}
+ public void persistHostedInstanceMetadata(Map<String, Set<String>> hostedInstancesMetadata) throws SQLException {
+ hBaseAccessor.saveInstanceHostsMetadata(hostedInstancesMetadata);
+ }
+
public TimelineMetricMetadata getTimelineMetricMetadata(TimelineMetric timelineMetric, boolean isWhitelisted) {
return new TimelineMetricMetadata(
timelineMetric.getMetricName(),
@@ -233,6 +267,10 @@ public class TimelineMetricMetadataManager {
return hBaseAccessor.getHostedAppsMetadata();
}
+ Map<String, Set<String>> getHostedInstancesFromStore() throws SQLException {
+ return hBaseAccessor.getInstanceHostsMetdata();
+ }
+
private boolean supportAggregates(TimelineMetric metric) {
return MapUtils.isEmpty(metric.getMetadata()) ||
!(String.valueOf(true).equals(metric.getMetadata().get("skipAggregation")));
http://git-wip-us.apache.org/repos/asf/ambari/blob/323da508/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java
index 25b525a..6d519f6 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java
@@ -45,11 +45,15 @@ public class TimelineMetricMetadataSync implements Runnable {
persistMetricMetadata();
LOG.debug("Persisting hosted apps metadata...");
persistHostAppsMetadata();
+ LOG.debug("Persisting hosted instance metadata...");
+ persistHostInstancesMetadata();
if (cacheManager.isDistributedModeEnabled()) {
LOG.debug("Refreshing metric metadata...");
refreshMetricMetadata();
LOG.debug("Refreshing hosted apps metadata...");
refreshHostAppsMetadata();
+ LOG.debug("Refreshing hosted instances metadata...");
+ refreshHostedInstancesMetadata();
}
}
@@ -147,6 +151,41 @@ public class TimelineMetricMetadataSync implements Runnable {
}
/**
+ * Sync apps instances data if needed
+ */
+ private void persistHostInstancesMetadata() {
+ if (cacheManager.syncHostedInstanceMetadata()) {
+ Map<String, Set<String>> persistedData = null;
+ try {
+ persistedData = cacheManager.getHostedInstancesFromStore();
+ } catch (SQLException e) {
+ LOG.warn("Failed on fetching hosted instances data from store.", e);
+ return; // Something wrong with store
+ }
+
+ Map<String, Set<String>> cachedData = cacheManager.getHostedInstanceCache();
+ Map<String, Set<String>> dataToSync = new HashMap<>();
+ if (cachedData != null && !cachedData.isEmpty()) {
+ for (Map.Entry<String, Set<String>> cacheEntry : cachedData.entrySet()) {
+ // No persistence / stale data in store
+ if (persistedData == null || persistedData.isEmpty() ||
+ !persistedData.containsKey(cacheEntry.getKey()) ||
+ !persistedData.get(cacheEntry.getKey()).containsAll(cacheEntry.getValue())) {
+ dataToSync.put(cacheEntry.getKey(), cacheEntry.getValue());
+ }
+ }
+ try {
+ cacheManager.persistHostedInstanceMetadata(dataToSync);
+ cacheManager.markSuccessOnSyncHostedInstanceMetadata();
+
+ } catch (SQLException e) {
+ LOG.warn("Error persisting hosted apps metadata.", e);
+ }
+ }
+
+ }
+ }
+ /**
* Read all hosted apps metadata and update cached values - HA
*/
private void refreshHostAppsMetadata() {
@@ -166,4 +205,22 @@ public class TimelineMetricMetadataSync implements Runnable {
}
}
}
+
+ private void refreshHostedInstancesMetadata() {
+ Map<String, Set<String>> hostedInstancesFromStore = null;
+ try {
+ hostedInstancesFromStore = cacheManager.getHostedInstancesFromStore();
+ } catch (SQLException e) {
+ LOG.warn("Error refreshing metadata from store.", e);
+ }
+ if (hostedInstancesFromStore != null) {
+ Map<String, Set<String>> cachedData = cacheManager.getHostedInstanceCache();
+
+ for (Map.Entry<String, Set<String>> storeEntry : hostedInstancesFromStore.entrySet()) {
+ if (!cachedData.containsKey(storeEntry.getKey())) {
+ cachedData.put(storeEntry.getKey(), storeEntry.getValue());
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/323da508/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
index 0c8e5a7..d39230d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
@@ -148,6 +148,12 @@ public class PhoenixTransactSQL {
"CONSTRAINT pk PRIMARY KEY (HOSTNAME))" +
"DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'";
+ public static final String CREATE_INSTANCE_HOST_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS INSTANCE_HOST_METADATA " +
+ "(INSTANCE_ID VARCHAR, HOSTNAME VARCHAR, " +
+ "CONSTRAINT pk PRIMARY KEY (INSTANCE_ID, HOSTNAME))" +
+ "DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'";
+
public static final String ALTER_METRICS_METADATA_TABLE =
"ALTER TABLE METRICS_METADATA ADD IF NOT EXISTS IS_WHITELISTED BOOLEAN";
@@ -230,6 +236,9 @@ public class PhoenixTransactSQL {
public static final String UPSERT_HOSTED_APPS_METADATA_SQL =
"UPSERT INTO HOSTED_APPS_METADATA (HOSTNAME, APP_IDS) VALUES (?, ?)";
+ public static final String UPSERT_INSTANCE_HOST_METADATA_SQL =
+ "UPSERT INTO INSTANCE_HOST_METADATA (INSTANCE_ID, HOSTNAME) VALUES (?, ?)";
+
/**
* Retrieve a set of rows from metrics records table.
*/
@@ -309,6 +318,9 @@ public class PhoenixTransactSQL {
public static final String GET_HOSTED_APPS_METADATA_SQL = "SELECT " +
"HOSTNAME, APP_IDS FROM HOSTED_APPS_METADATA";
+ public static final String GET_INSTANCE_HOST_METADATA_SQL = "SELECT " +
+ "INSTANCE_ID, HOSTNAME FROM INSTANCE_HOST_METADATA";
+
/**
* Aggregate host metrics using a GROUP BY clause to take advantage of
* N - way parallel scan where N = number of regions.
http://git-wip-us.apache.org/repos/asf/ambari/blob/323da508/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
index 304a8e0..6278c59 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
@@ -412,6 +412,22 @@ public class TimelineWebServices {
}
}
+ @GET
+ @Path("/metrics/instances")
+ @Produces({ MediaType.APPLICATION_JSON })
+ public Map<String, Set<String>> getClusterHostsMetadata(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res
+ ) {
+ init(res);
+
+ try {
+ return timelineMetricStore.getInstanceHostsMetadata();
+ } catch (Exception e) {
+ throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
/**
* This is a discovery endpoint that advertises known live collector
* instances. Note: It will always answer with current instance as live.
http://git-wip-us.apache.org/repos/asf/ambari/blob/323da508/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
index b2e8cac..b40481d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
@@ -97,6 +97,11 @@ public class TestTimelineMetricStore implements TimelineMetricStore {
}
@Override
+ public Map<String, Set<String>> getInstanceHostsMetadata() throws SQLException, IOException {
+ return Collections.emptyMap();
+ }
+
+ @Override
public List<String> getLiveInstances() {
return Collections.emptyList();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/323da508/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
index b243e0b..c62fd34 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
@@ -69,6 +69,7 @@ public class TestMetadataManager extends AbstractMiniHBaseClusterTest {
metric2.setStartTime(now - 1000);
metric2.setAppId("dummy_app2");
metric2.setType("Integer");
+ metric2.setInstanceId("instance2");
metric2.setMetricValues(new TreeMap<Long, Double>() {{
put(now - 100, 1.0);
put(now - 200, 2.0);
@@ -144,5 +145,12 @@ public class TestMetadataManager extends AbstractMiniHBaseClusterTest {
Assert.assertEquals("dummy_app1", savedHostData.get("dummy_host1").iterator().next());
Assert.assertEquals("dummy_app2", savedHostData.get("dummy_host2").iterator().next());
Assert.assertEquals("dummy_app3", cachedHostData.get("dummy_host3").iterator().next());
+
+
+ Map<String, Set<String>> cachedHostInstanceData = metadataManager.getHostedInstanceCache();
+ Map<String, Set<String>> savedHostInstanceData = metadataManager.getHostedInstancesFromStore();
+ Assert.assertEquals(cachedHostInstanceData.size(), savedHostInstanceData.size());
+ Assert.assertEquals("dummy_host2", cachedHostInstanceData.get("instance2").iterator().next());
+
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/323da508/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java
index 5eab903..181abca 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java
@@ -56,9 +56,15 @@ public class TestMetadataSync {
put("h2", new HashSet<>(Arrays.asList("a1", "a2")));
}};
+ Map<String, Set<String>> hostedInstances = new HashMap<String, Set<String>>() {{
+ put("i1", new HashSet<>(Arrays.asList("h1")));
+ put("i2", new HashSet<>(Arrays.asList("h1", "h2")));
+ }};
+
expect(configuration.get("timeline.metrics.service.operation.mode", "")).andReturn("distributed");
expect(hBaseAccessor.getTimelineMetricMetadata()).andReturn(metadata);
expect(hBaseAccessor.getHostedAppsMetadata()).andReturn(hostedApps);
+ expect(hBaseAccessor.getInstanceHostsMetdata()).andReturn(hostedInstances);
replay(configuration, hBaseAccessor);
@@ -80,6 +86,12 @@ public class TestMetadataSync {
Assert.assertEquals(2, hostedApps.size());
Assert.assertEquals(1, hostedApps.get("h1").size());
Assert.assertEquals(2, hostedApps.get("h2").size());
+
+ hostedInstances = metadataManager.getHostedInstanceCache();
+ Assert.assertEquals(2, hostedInstances.size());
+ Assert.assertEquals(1, hostedInstances.get("i1").size());
+ Assert.assertEquals(2, hostedInstances.get("i2").size());
+
}
@Test