You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by wu...@apache.org on 2022/11/15 14:55:40 UTC
[ambari-metrics] branch master updated: AMBARI-25569: Reassess Ambari Metrics data migration (#62)
This is an automated email from the ASF dual-hosted git repository.
wuzhiguo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ambari-metrics.git
The following commit(s) were added to refs/heads/master by this push:
new 85fbf27 AMBARI-25569: Reassess Ambari Metrics data migration (#62)
85fbf27 is described below
commit 85fbf2730e776c29b5f80795fdccdd9ebff3f600
Author: lucasbak <lu...@gmail.com>
AuthorDate: Tue Nov 15 15:55:34 2022 +0100
AMBARI-25569: Reassess Ambari Metrics data migration (#62)
---
.../upgrade/core/AbstractPhoenixMetricsCopier.java | 84 +++----
.../upgrade/core/MetricsDataMigrationLauncher.java | 272 ++++++++++++---------
.../upgrade/core/PhoenixClusterMetricsCopier.java | 17 +-
.../upgrade/core/PhoenixHostMetricsCopier.java | 17 +-
4 files changed, 207 insertions(+), 183 deletions(-)
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java
index 3d2002b..f70dc3f 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java
@@ -20,9 +20,10 @@ package org.apache.ambari.metrics.core.timeline.upgrade.core;
import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
-import java.io.FileWriter;
import java.io.IOException;
+import java.io.Writer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -31,16 +32,16 @@ import java.util.Set;
public abstract class AbstractPhoenixMetricsCopier implements Runnable {
private static final Log LOG = LogFactory.getLog(AbstractPhoenixMetricsCopier.class);
- private static final Long DEFAULT_NATIVE_TIME_RANGE_DELAY = 120000L;
- private final Long startTime;
- protected final FileWriter processedMetricsFile;
+ private static final long DEFAULT_NATIVE_TIME_RANGE_DELAY = 120000L;
+ private final long startTime;
+ protected final Writer processedMetricsFile;
protected String inputTable;
protected String outputTable;
protected Set<String> metricNames;
protected PhoenixHBaseAccessor hBaseAccessor;
public AbstractPhoenixMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor,
- Set<String> metricNames, Long startTime, FileWriter outputStream) {
+ Set<String> metricNames, long startTime, Writer outputStream) {
this.inputTable = inputTableName;
this.outputTable = outputTableName;
this.hBaseAccessor = hBaseAccessor;
@@ -52,7 +53,7 @@ public abstract class AbstractPhoenixMetricsCopier implements Runnable {
@Override
public void run(){
LOG.info(String.format("Copying %s metrics from %s to %s", metricNames, inputTable, outputTable));
- long startTimer = System.currentTimeMillis();
+ long timerStart = System.currentTimeMillis();
String query = String.format("SELECT %s %s FROM %s WHERE %s AND SERVER_TIME > %s ORDER BY METRIC_NAME, SERVER_TIME",
getQueryHint(startTime), getColumnsClause(), inputTable, getMetricNamesLikeClause(), startTime);
@@ -62,23 +63,20 @@ public abstract class AbstractPhoenixMetricsCopier implements Runnable {
saveMetrics();
} catch (SQLException e) {
LOG.error(e);
- }
- long estimatedTime = System.currentTimeMillis() - startTimer;
- LOG.debug(String.format("Copying took %s seconds from table %s to table %s for metric names %s", estimatedTime/ 1000.0, inputTable, outputTable, metricNames));
+ } finally {
+ long timerDelta = System.currentTimeMillis() - timerStart;
+ LOG.debug(String.format("Copying took %s seconds from table %s to table %s for metric names %s", timerDelta/ 1000.0, inputTable,
- saveMetricsProgress();
+ saveMetricsProgress();
+ }
}
private String getMetricNamesLikeClause() {
- StringBuilder sb = new StringBuilder();
+ StringBuilder sb = new StringBuilder(256);
sb.append('(');
int i = 0;
for (String metricName : metricNames) {
- sb.append("METRIC_NAME");
- sb.append(" LIKE ");
- sb.append("'");
- sb.append(metricName);
- sb.append("'");
+ sb.append("METRIC_NAME LIKE '").append(metricName).append("'");
if (i < metricNames.size() - 1) {
sb.append(" OR ");
@@ -94,32 +92,15 @@ public abstract class AbstractPhoenixMetricsCopier implements Runnable {
private void runPhoenixQueryAndAddToResults(String query) {
LOG.debug(String.format("Running query: %s", query));
- Connection conn = null;
- PreparedStatement stmt = null;
- try {
- conn = hBaseAccessor.getConnection();
- stmt = conn.prepareStatement(query);
- ResultSet rs = stmt.executeQuery();
- while (rs.next()) {
- addToResults(rs);
+ try (Connection conn = hBaseAccessor.getConnection();
+ PreparedStatement stmt = conn.prepareStatement(query)) {
+ try (ResultSet rs = stmt.executeQuery()) {
+ while (rs.next()) {
+ addToResults(rs);
+ }
}
} catch (SQLException e) {
LOG.error(String.format("Exception during running phoenix query %s", query), e);
- } finally {
- if (stmt != null) {
- try {
- stmt.close();
- } catch (SQLException e) {
- // Ignore
- }
- }
- if (conn != null) {
- try {
- conn.close();
- } catch (SQLException e) {
- // Ignore
- }
- }
}
}
@@ -127,29 +108,34 @@ public abstract class AbstractPhoenixMetricsCopier implements Runnable {
* Saves processed metric names info provided file in format TABLE_NAME:METRIC_NAME
*/
private void saveMetricsProgress() {
- if (processedMetricsFile == null) {
+ if (this.processedMetricsFile == null) {
LOG.info("Skipping metrics progress save as the file is null");
return;
}
+
for (String metricName : metricNames) {
try {
- processedMetricsFile.append(inputTable + ":" + metricName + System.lineSeparator());
+ synchronized (this.processedMetricsFile) {
+ this.processedMetricsFile.append(inputTable).append(":").append(metricName).append(System.lineSeparator());
+ }
} catch (IOException e) {
LOG.error(e);
}
}
}
- protected String getQueryHint(Long startTime) {
- StringBuilder sb = new StringBuilder();
- sb.append("/*+ ");
- sb.append("NATIVE_TIME_RANGE(");
- sb.append(startTime - DEFAULT_NATIVE_TIME_RANGE_DELAY);
- sb.append(") ");
- sb.append("*/");
- return sb.toString();
+ protected String getQueryHint(long startTime) {
+ return new StringBuilder().append("/*+ NATIVE_TIME_RANGE(").append(startTime - DEFAULT_NATIVE_TIME_RANGE_DELAY).append(") */").toString();
}
+ protected MetricHostAggregate extractMetricHostAggregate(ResultSet rs) throws SQLException {
+ MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
+ metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
+ metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
+ metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
+ metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
+ return metricHostAggregate;
+ }
/**
* Saves aggregated metrics to the Hbase
* @throws SQLException
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
index 3a25aee..0c2f8e6 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
@@ -21,21 +21,29 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey;
import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.Writer;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -44,6 +52,7 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
@@ -62,14 +71,14 @@ import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.M
public class MetricsDataMigrationLauncher {
private static final Log LOG = LogFactory.getLog(MetricsDataMigrationLauncher.class);
private static final Long DEFAULT_TIMEOUT_MINUTES = 60*24L;
- private static String patternPrefix = "._p_";
+ private static final String PATTERN_PREFIX = "._p_";
private static final int DEFAULT_BATCH_SIZE = 5;
+ private static final String MIGRATE_ALL_METRICS_ARG = "--allmetrics";
public static final Map<String, String> CLUSTER_AGGREGATE_TABLES_MAPPING = new HashMap<>();
public static final Map<String, String> HOST_AGGREGATE_TABLES_MAPPING = new HashMap<>();
public static final String DEFAULT_PROCESSED_METRICS_FILE_LOCATION = "/var/log/ambari-metrics-collector/ambari-metrics-migration-state.txt";
public static final int DEFAULT_NUMBER_OF_THREADS = 3;
- public static final long ONE_MONTH_MILLIS = 2592000000L;
- public static final long DEFAULT_START_TIME = System.currentTimeMillis() - ONE_MONTH_MILLIS; //Last month
+ public static final int DEFAULT_START_DAYS = 30; // 30 Days, Last month
static {
CLUSTER_AGGREGATE_TABLES_MAPPING.put(METRICS_CLUSTER_AGGREGATE_MINUTE_V1_TABLE_NAME, METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME);
@@ -83,127 +92,163 @@ public class MetricsDataMigrationLauncher {
private final Set<Set<String>> metricNamesBatches;
private final String processedMetricsFilePath;
- private Set<String> metricNames;
- private Long startTime;
- private Integer batchSize;
- private Integer numberOfThreads;
+ private final long startTimeEpoch;
+ private final int numberOfThreads;
private TimelineMetricConfiguration timelineMetricConfiguration;
private PhoenixHBaseAccessor hBaseAccessor;
private TimelineMetricMetadataManager timelineMetricMetadataManager;
private Map<String, Set<String>> processedMetrics;
- public MetricsDataMigrationLauncher(String whitelistedFilePath, String processedMetricsFilePath, Long startTime, Integer numberOfThreads, Integer batchSize) throws Exception {
- this.startTime = startTime == null? DEFAULT_START_TIME : startTime;
- this.numberOfThreads = numberOfThreads == null? DEFAULT_NUMBER_OF_THREADS : numberOfThreads;
- this.batchSize = batchSize == null? DEFAULT_BATCH_SIZE : batchSize;
- this.processedMetricsFilePath = processedMetricsFilePath == null? DEFAULT_PROCESSED_METRICS_FILE_LOCATION : processedMetricsFilePath;
+ public MetricsDataMigrationLauncher(String whitelistedFilePath, String processedMetricsFilePath, Long startDay, Integer numberOfThreads, Integer batchSize) throws Exception {
+ this.startTimeEpoch = calculateStartEpochTime(startDay);
+ this.numberOfThreads = (numberOfThreads == null) ? DEFAULT_NUMBER_OF_THREADS : numberOfThreads;
+ this.processedMetricsFilePath = (processedMetricsFilePath == null) ? DEFAULT_PROCESSED_METRICS_FILE_LOCATION : processedMetricsFilePath;
initializeHbaseAccessor();
+ readProcessedMetricsMap();
+
+ final Set<String> metricNames = getMetricNames(whitelistedFilePath);
- LOG.info("Looking for whitelisted metric names...");
+ LOG.info("Setting up batches...");
+ if (batchSize == null) batchSize = DEFAULT_BATCH_SIZE;
+ this.metricNamesBatches = new HashSet<>(batchSize);
+
+ Iterables.partition(metricNames, batchSize)
+ .forEach(batch -> metricNamesBatches.add(new HashSet<>(batch)));
+ LOG.info(String.format("Split metric names into %s batches with size of %s", metricNamesBatches.size(), batchSize));
+ }
- if (whitelistedFilePath != null) {
- this.metricNames = readMetricWhitelistFromFile(whitelistedFilePath);
+ private long calculateStartEpochTime(Long startDay) {
+ final long days;
+ if (startDay == null) {
+ LOG.info(String.format("No starting day have been provided, using default: %d", DEFAULT_START_DAYS));
+ days = DEFAULT_START_DAYS;
} else {
- String whitelistFile = timelineMetricConfiguration.getMetricsConf().get(TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE, TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE_LOCATION_DEFAULT);
- metricNames = readMetricWhitelistFromFile(whitelistFile);
+ LOG.info(String.format("%d days have been provided as migration starting day.", startDay));
+ days = startDay;
}
+ LOG.info(String.format("The last %d days' data will be migrated.", days));
- readProcessedMetricsMap();
+ return LocalDateTime.now().minusDays(days).toEpochSecond(ZoneOffset.UTC);
+ }
- LOG.info("Setting up batches...");
- this.metricNamesBatches = new HashSet<>();
+ private Set<String> getMetricNames(String whitelistedFilePath) throws MalformedURLException, URISyntaxException, SQLException {
+ if(StringUtils.isNotEmpty(whitelistedFilePath) && whitelistedFilePath.equalsIgnoreCase(MIGRATE_ALL_METRICS_ARG)) {
+ LOG.info("Migration of all metrics has been requested by the " + MIGRATE_ALL_METRICS_ARG + " argument.");
+ LOG.info("Looking for all the metric names in the Metrics Database...");
+ return this.hBaseAccessor.getTimelineMetricMetadataV1().keySet().stream()
+ .map(TimelineMetricMetadataKey::getMetricName).collect(Collectors.toSet());
+ }
- Iterables.partition(metricNames, this.batchSize)
- .forEach(batch -> metricNamesBatches.add(new HashSet<>(batch)));
- LOG.info(String.format("Split metric names into %s batches with size of %s", metricNamesBatches.size(), this.batchSize));
- }
+ if(StringUtils.isNotEmpty(whitelistedFilePath)) {
+ LOG.info(String.format("Whitelist file %s has been provided.", whitelistedFilePath));
+ LOG.info("Looking for whitelisted metric names based on the file content...");
+ return readMetricWhitelistFromFile(whitelistedFilePath);
+ }
+
+ final Configuration conf = this.timelineMetricConfiguration.getMetricsConf();
+ if (Boolean.parseBoolean(conf.get(TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_ENABLED))) {
+ whitelistedFilePath = conf.get(TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE,
+ TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE_LOCATION_DEFAULT);
+ LOG.info(String.format("No whitelist file has been provided but Ambari Metrics Whitelisting is enabled. " +
+ "Using %s as whitelist file.", whitelistedFilePath));
+ LOG.info("Looking for whitelisted metric names based on the file content...");
+ return readMetricWhitelistFromFile(whitelistedFilePath);
+ }
+ LOG.info("No whitelist file has been provided and Ambari Metrics Whitelisting is disabled.");
+ LOG.info("Looking for all the metric names in the Metrics Database...");
+ return this.hBaseAccessor.getTimelineMetricMetadataV1().keySet().stream()
+ .map(TimelineMetricMetadataKey::getMetricName).collect(Collectors.toSet());
+ }
private void readProcessedMetricsMap() {
- Map<String, Set<String>> result = new HashMap<>();
- if (!Files.exists(Paths.get(processedMetricsFilePath))) {
- LOG.info(String.format("The processed metrics file %s is missing, assuming there were no metrics processed.", processedMetricsFilePath));
- this.processedMetrics = new HashMap<>();
- }
- LOG.info(String.format("Reading the list of already copied metrics from %s", processedMetricsFilePath));
- try {
- try (Stream<String> stream = Files.lines(Paths.get(processedMetricsFilePath))) {
- stream.forEach( line -> {
- String [] lineSplit = line.split(":");
- if (!result.containsKey(lineSplit[0])) {
- result.put(lineSplit[0], new HashSet<>(Collections.singletonList(lineSplit[1])));
- } else {
- result.get(lineSplit[0]).add(lineSplit[1]);
- }
- });
+ final Map<String, Set<String>> result = new HashMap<>();
+ final Path path = Paths.get(this.processedMetricsFilePath);
+
+ if (Files.notExists(path)) {
+ LOG.info(String.format("The processed metrics file %s is missing, assuming there were no metrics processed.", this.processedMetricsFilePath));
+ } else {
+ LOG.info(String.format("Reading the list of already copied metrics from %s", this.processedMetricsFilePath));
+ try {
+ try (Stream<String> stream = Files.lines(path)) {
+ stream.forEach(line -> {
+ String[] lineSplit = line.split(":");
+ if (!result.containsKey(lineSplit[0])) {
+ result.put(lineSplit[0], new HashSet<>(Collections.singletonList(lineSplit[1])));
+ } else {
+ result.get(lineSplit[0]).add(lineSplit[1]);
+ }
+ });
+ }
+ } catch (IOException e) {
+ LOG.error(e);
}
- } catch (IOException e) {
- LOG.error(e);
}
this.processedMetrics = result;
}
public void runMigration(Long timeoutInMinutes) throws IOException {
-
- FileWriter processedMetricsFileWriter = new FileWriter(processedMetricsFilePath, true);
- LOG.info("Setting up copiers...");
- Set<AbstractPhoenixMetricsCopier> copiers = new HashSet<>();
- for (Set<String> batch : metricNamesBatches) {
- for (Map.Entry<String, String> entry : CLUSTER_AGGREGATE_TABLES_MAPPING.entrySet()) {
- Set<String> filteredMetrics = filterProcessedMetrics(batch, this.processedMetrics, entry.getKey());
- if (!filteredMetrics.isEmpty()) {
- copiers.add(new PhoenixClusterMetricsCopier(entry.getKey(), entry.getValue(), hBaseAccessor,
- filteredMetrics, startTime, processedMetricsFileWriter));
+ try (Writer processedMetricsFileWriter = new BufferedWriter(new FileWriter(this.processedMetricsFilePath, true))) {
+ LOG.info("Setting up copiers...");
+ Set<AbstractPhoenixMetricsCopier> copiers = new HashSet<>();
+ for (Set<String> batch : metricNamesBatches) {
+ for (Map.Entry<String, String> entry : CLUSTER_AGGREGATE_TABLES_MAPPING.entrySet()) {
+ Set<String> filteredMetrics = filterProcessedMetrics(batch, this.processedMetrics, entry.getKey());
+ if (!filteredMetrics.isEmpty()) {
+ copiers.add(new PhoenixClusterMetricsCopier(entry.getKey(), entry.getValue(), this.hBaseAccessor,
+ filteredMetrics, this.startTimeEpoch, processedMetricsFileWriter));
+ }
}
- }
- for (Map.Entry<String, String> entry : HOST_AGGREGATE_TABLES_MAPPING.entrySet()) {
- Set<String> filteredMetrics = filterProcessedMetrics(batch, processedMetrics, entry.getKey());
- if (!filteredMetrics.isEmpty()) {
- copiers.add(new PhoenixHostMetricsCopier(entry.getKey(), entry.getValue(), hBaseAccessor,
- filteredMetrics, startTime, processedMetricsFileWriter));
+ for (Map.Entry<String, String> entry : HOST_AGGREGATE_TABLES_MAPPING.entrySet()) {
+ Set<String> filteredMetrics = filterProcessedMetrics(batch, this.processedMetrics, entry.getKey());
+ if (!filteredMetrics.isEmpty()) {
+ copiers.add(new PhoenixHostMetricsCopier(entry.getKey(), entry.getValue(), this.hBaseAccessor,
+ filteredMetrics, this.startTimeEpoch, processedMetricsFileWriter));
+ }
}
}
- }
- if (copiers.isEmpty()) {
- LOG.info("No copy threads to run, looks like all metrics have been copied.");
- processedMetricsFileWriter.close();
- return;
- }
-
- LOG.info("Running the copy threads...");
- long startTimer = System.currentTimeMillis();
- ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads == null ? DEFAULT_NUMBER_OF_THREADS : numberOfThreads);
- for (AbstractPhoenixMetricsCopier copier : copiers) {
- executorService.submit(copier);
- }
+ if (copiers.isEmpty()) {
+ LOG.info("No copy threads to run, looks like all metrics have been copied.");
+ return;
+ }
- executorService.shutdown();
+ LOG.info("Running the copy threads...");
+ long timerStart = System.currentTimeMillis();
+ ExecutorService executorService = null;
+ try {
+ executorService = Executors.newFixedThreadPool(this.numberOfThreads);
+ for (AbstractPhoenixMetricsCopier copier : copiers) {
+ executorService.submit(copier);
+ }
+ } finally {
+ if (executorService != null) {
+ executorService.shutdown();
+ try {
+ executorService.awaitTermination(timeoutInMinutes, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
+ }
- try {
- executorService.awaitTermination(timeoutInMinutes, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- LOG.error(e);
+ long timerDelta = System.currentTimeMillis() - timerStart;
+ LOG.info(String.format("Copying took %s seconds", timerDelta / 1000.0));
}
-
- long estimatedTime = System.currentTimeMillis() - startTimer;
- LOG.info(String.format("Copying took %s seconds", estimatedTime/1000.0));
-
- processedMetricsFileWriter.close();
}
private void initializeHbaseAccessor() throws MalformedURLException, URISyntaxException {
this.hBaseAccessor = new PhoenixHBaseAccessor(null);
this.timelineMetricConfiguration = TimelineMetricConfiguration.getInstance();
- timelineMetricConfiguration.initialize();
+ this.timelineMetricConfiguration.initialize();
- timelineMetricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor);
- timelineMetricMetadataManager.initializeMetadata(false);
+ this.timelineMetricMetadataManager = new TimelineMetricMetadataManager(this.hBaseAccessor);
+ this.timelineMetricMetadataManager.initializeMetadata(false);
- hBaseAccessor.setMetadataInstance(timelineMetricMetadataManager);
+ this.hBaseAccessor.setMetadataInstance(this.timelineMetricMetadataManager);
}
private static Set<String> filterProcessedMetrics(Set<String> metricNames, Map<String, Set<String>> processedMetrics, String tableName) {
@@ -219,21 +264,18 @@ public class MetricsDataMigrationLauncher {
*/
private static Set<String> readMetricWhitelistFromFile(String whitelistFile) {
LOG.info(String.format("Reading metric names from %s", whitelistFile));
- Set<String> whitelistedMetrics = new HashSet<>();
+ final Set<String> whitelistedMetrics = new HashSet<>();
- BufferedReader br = null;
String strLine;
- try(FileInputStream fstream = new FileInputStream(whitelistFile)) {
- br = new BufferedReader(new InputStreamReader(fstream));
-
+ try(BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(whitelistFile)))) {
while ((strLine = br.readLine()) != null) {
strLine = strLine.trim();
if (StringUtils.isEmpty(strLine)) {
continue;
}
- if (strLine.startsWith(patternPrefix)) {
- strLine = strLine.replace(patternPrefix, "");
+ if (strLine.startsWith(PATTERN_PREFIX)) {
+ strLine = strLine.replace(PATTERN_PREFIX, "");
}
if (strLine.contains("*")) {
strLine = strLine.replaceAll("\\*", "%");
@@ -248,21 +290,24 @@ public class MetricsDataMigrationLauncher {
private void saveMetadata() throws SQLException {
LOG.info("Saving metadata to store...");
- timelineMetricMetadataManager.updateMetadataCacheUsingV1Tables();
- timelineMetricMetadataManager.forceMetricsMetadataSync();
+ this.timelineMetricMetadataManager.updateMetadataCacheUsingV1Tables();
+ this.timelineMetricMetadataManager.forceMetricsMetadataSync();
LOG.info("Metadata was saved.");
}
-
/**
*
* @param args
* REQUIRED args[0] - processedMetricsFilePath - full path to the file where processed metric are/will be stored
*
* OPTIONAL args[1] - whitelistedFilePath - full path to the file with whitelisted metrics filenames
- * if not provided the default whitelist file location will be used if configured
- * if not configured - will result in error
- * args[2] - startTime - default value is set to the last 30 days
+ * if not provided and AMS whitelisting is enabled the default whitelist
+ * file location will be used if configured
+ * if not provided and AMS whitelisting is disabled then no whitelisting
+ * will be used and all the metrics will be migrated
+ * if --allmetrics switch is provided then all the metrics will be migrated
+ * regardless to other settings
+ * args[2] - startDay - default value is set to the last 30 days
* args[3] - numberOfThreads - default value is 3
* args[4] - batchSize - default value is 5
* args[5] - timeoutInMinutes - default value is set to the equivalent of 24 hours
@@ -270,7 +315,7 @@ public class MetricsDataMigrationLauncher {
public static void main(String[] args) {
String processedMetricsFilePath = null;
String whitelistedFilePath = null;
- Long startTime = null;
+ Long startDay = null;
Integer numberOfThreads = null;
Integer batchSize = null;
Long timeoutInMinutes = DEFAULT_TIMEOUT_MINUTES;
@@ -282,7 +327,7 @@ public class MetricsDataMigrationLauncher {
whitelistedFilePath = args[1];
}
if (args.length>2) {
- startTime = Long.valueOf(args[2]);
+ startDay = Long.valueOf(args[2]);
}
if (args.length>3) {
numberOfThreads = Integer.valueOf(args[3]);
@@ -297,30 +342,29 @@ public class MetricsDataMigrationLauncher {
MetricsDataMigrationLauncher dataMigrationLauncher = null;
try {
LOG.info("Initializing system...");
- dataMigrationLauncher = new MetricsDataMigrationLauncher(whitelistedFilePath, processedMetricsFilePath, startTime, numberOfThreads, batchSize);
+ dataMigrationLauncher = new MetricsDataMigrationLauncher(whitelistedFilePath, processedMetricsFilePath, startDay, numberOfThreads, batchSize);
} catch (Exception e) {
LOG.error("Exception during system setup, exiting...", e);
System.exit(1);
}
+ int exitCode = 0;
try {
- //Setup shutdown hook for metadata save.
- MetricsDataMigrationLauncher finalDataMigrationLauncher = dataMigrationLauncher;
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- try {
- finalDataMigrationLauncher.saveMetadata();
- } catch (SQLException e) {
- LOG.error("Exception during metadata saving, exiting...", e);
- }
- }));
-
dataMigrationLauncher.runMigration(timeoutInMinutes);
- } catch (IOException e) {
+ } catch (Throwable e) {
+ exitCode = 1;
LOG.error("Exception during data migration, exiting...", e);
- System.exit(1);
+ } finally {
+ try {
+ dataMigrationLauncher.saveMetadata();
+ } catch (SQLException e) {
+ exitCode = 1;
+ LOG.error("Exception while saving the Metadata, exiting...", e);
+ }
}
- System.exit(0);
+ if(exitCode == 0) LOG.info("Data migration finished successfully.");
+ System.exit(exitCode);
}
}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java
index ee65f00..0840be8 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
-import java.io.FileWriter;
+import java.io.Writer;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
@@ -32,9 +32,9 @@ import java.util.Set;
public class PhoenixClusterMetricsCopier extends AbstractPhoenixMetricsCopier {
private static final Log LOG = LogFactory.getLog(PhoenixClusterMetricsCopier.class);
- private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMap = new HashMap<>();
+ private final Map<TimelineClusterMetric, MetricHostAggregate> aggregateMap = new HashMap<>();
- PhoenixClusterMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, Long startTime, FileWriter processedMetricsFileWriter) {
+ PhoenixClusterMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, long startTime, Writer processedMetricsFileWriter) {
super(inputTableName, outputTableName, hBaseAccessor, metricNames, startTime, processedMetricsFileWriter);
}
@@ -53,7 +53,7 @@ public class PhoenixClusterMetricsCopier extends AbstractPhoenixMetricsCopier {
@Override
protected void saveMetrics() throws SQLException {
LOG.debug(String.format("Saving %s results read from %s into %s", aggregateMap.size(), inputTable, outputTable));
- hBaseAccessor.saveClusterAggregateRecordsSecond(aggregateMap, outputTable);
+ this.hBaseAccessor.saveClusterAggregateRecordsSecond(aggregateMap, outputTable);
}
@Override
@@ -62,13 +62,10 @@ public class PhoenixClusterMetricsCopier extends AbstractPhoenixMetricsCopier {
rs.getString("METRIC_NAME"), rs.getString("APP_ID"),
rs.getString("INSTANCE_ID"), rs.getLong("SERVER_TIME"));
- MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
- metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
- metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
- metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
- metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
+ MetricHostAggregate metricHostAggregate = extractMetricHostAggregate(rs);
- aggregateMap.put(timelineMetric, metricHostAggregate);
+ this.aggregateMap.put(timelineMetric, metricHostAggregate);
+ }
}
}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java
index a4f0c23..5964a3a 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java
@@ -23,7 +23,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import java.io.FileWriter;
+
+import java.io.Writer;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
@@ -32,9 +33,9 @@ import java.util.Set;
public class PhoenixHostMetricsCopier extends AbstractPhoenixMetricsCopier {
private static final Log LOG = LogFactory.getLog(PhoenixHostMetricsCopier.class);
- private Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>();
+ private final Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>();
- PhoenixHostMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, Long startTime, FileWriter processedMetricsFileWriter) {
+ PhoenixHostMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, long startTime, Writer processedMetricsFileWriter) {
super(inputTableName, outputTableName, hBaseAccessor, metricNames, startTime, processedMetricsFileWriter);
}
@@ -54,7 +55,7 @@ public class PhoenixHostMetricsCopier extends AbstractPhoenixMetricsCopier {
@Override
protected void saveMetrics() throws SQLException {
LOG.debug(String.format("Saving %s results read from %s into %s", aggregateMap.size(), inputTable, outputTable));
- hBaseAccessor.saveHostAggregateRecords(aggregateMap, outputTable);
+ this.hBaseAccessor.saveHostAggregateRecords(aggregateMap, outputTable);
}
@Override
@@ -66,12 +67,8 @@ public class PhoenixHostMetricsCopier extends AbstractPhoenixMetricsCopier {
timelineMetric.setInstanceId(rs.getString("INSTANCE_ID"));
timelineMetric.setStartTime(rs.getLong("SERVER_TIME"));
- MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
- metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
- metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
- metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
- metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
+ MetricHostAggregate metricHostAggregate = extractMetricHostAggregate(rs);
- aggregateMap.put(timelineMetric, metricHostAggregate);
+ this.aggregateMap.put(timelineMetric, metricHostAggregate);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ambari.apache.org
For additional commands, e-mail: commits-help@ambari.apache.org