You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2015/04/03 00:00:52 UTC
[7/7] ambari git commit: AMBARI-10290. Expose avaialble host metrics
across hostcomponents. (swagle)
AMBARI-10290. Expose avaialble host metrics across hostcomponents. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b93452ed
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b93452ed
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b93452ed
Branch: refs/heads/trunk
Commit: b93452edab3d93a7217751192145eab3944876c1
Parents: 81f311b
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue Mar 31 16:07:12 2015 -0700
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Thu Apr 2 14:55:29 2015 -0700
----------------------------------------------------------------------
.../timeline/AbstractTimelineAggregator.java | 271 ------
.../metrics/timeline/AggregatorUtils.java | 59 --
.../metrics/timeline/ConnectionProvider.java | 29 -
.../timeline/DefaultPhoenixDataSource.java | 77 --
.../metrics/timeline/Function.java | 169 ----
.../timeline/HBaseTimelineMetricStore.java | 10 +-
.../metrics/timeline/MetricAggregate.java | 110 ---
.../timeline/MetricClusterAggregate.java | 74 --
.../metrics/timeline/MetricHostAggregate.java | 81 --
.../metrics/timeline/PhoenixHBaseAccessor.java | 61 +-
.../metrics/timeline/PhoenixTransactSQL.java | 970 -------------------
.../metrics/timeline/TimelineClusterMetric.java | 97 --
.../timeline/TimelineClusterMetricReader.java | 42 -
.../timeline/TimelineMetricAggregator.java | 145 ---
.../TimelineMetricAggregatorFactory.java | 99 --
.../TimelineMetricClusterAggregator.java | 223 -----
.../TimelineMetricClusterAggregatorHourly.java | 177 ----
.../timeline/TimelineMetricConfiguration.java | 6 +
.../metrics/timeline/TimelineMetricReader.java | 65 --
.../aggregators/AbstractTimelineAggregator.java | 270 ++++++
.../timeline/aggregators/AggregatorUtils.java | 59 ++
.../metrics/timeline/aggregators/Function.java | 169 ++++
.../timeline/aggregators/MetricAggregate.java | 110 +++
.../aggregators/MetricClusterAggregate.java | 73 ++
.../aggregators/MetricHostAggregate.java | 81 ++
.../aggregators/TimelineClusterMetric.java | 97 ++
.../TimelineClusterMetricReader.java | 42 +
.../aggregators/TimelineMetricAggregator.java | 147 +++
.../TimelineMetricAggregatorFactory.java | 98 ++
.../TimelineMetricAppAggregator.java | 169 ++++
.../TimelineMetricClusterAggregator.java | 235 +++++
.../TimelineMetricClusterAggregatorHourly.java | 175 ++++
.../aggregators/TimelineMetricReadHelper.java | 66 ++
.../metrics/timeline/query/Condition.java | 46 +
.../timeline/query/ConnectionProvider.java | 29 +
.../timeline/query/DefaultCondition.java | 258 +++++
.../query/DefaultPhoenixDataSource.java | 77 ++
.../timeline/query/PhoenixTransactSQL.java | 573 +++++++++++
.../query/SplitByMetricNamesCondition.java | 165 ++++
.../TestApplicationHistoryServer.java | 3 +-
.../timeline/AbstractMiniHBaseClusterTest.java | 3 +-
.../AbstractTimelineAggregatorTest.java | 9 +-
.../metrics/timeline/FunctionTest.java | 7 +-
.../timeline/HBaseTimelineMetricStoreTest.java | 5 +-
.../metrics/timeline/ITClusterAggregator.java | 78 +-
.../metrics/timeline/ITMetricAggregator.java | 18 +-
.../timeline/ITPhoenixHBaseAccessor.java | 23 +-
.../metrics/timeline/MetricTestHelper.java | 20 +-
.../timeline/TestMetricHostAggregate.java | 1 +
.../timeline/TestPhoenixTransactSQL.java | 11 +-
.../0.1.0/configuration/ams-site.xml | 8 +
51 files changed, 3130 insertions(+), 2760 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
deleted file mode 100644
index 4af3db7..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.SystemClock;
-
-import java.io.File;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Date;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE;
-
-public abstract class AbstractTimelineAggregator implements Runnable {
- protected final PhoenixHBaseAccessor hBaseAccessor;
- private final Log LOG;
-
- private Clock clock;
- protected final long checkpointDelayMillis;
- protected final Integer resultsetFetchSize;
- protected Configuration metricsConf;
-
- public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
- Configuration metricsConf) {
- this(hBaseAccessor, metricsConf, new SystemClock());
- }
-
- public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
- Configuration metricsConf, Clock clk) {
- this.hBaseAccessor = hBaseAccessor;
- this.metricsConf = metricsConf;
- this.checkpointDelayMillis = SECONDS.toMillis(
- metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120));
- this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000);
- this.LOG = LogFactory.getLog(this.getClass());
- this.clock = clk;
- }
-
- @Override
- public void run() {
- LOG.info("Started Timeline aggregator thread @ " + new Date());
- Long SLEEP_INTERVAL = getSleepIntervalMillis();
-
- while (true) {
- long sleepTime = runOnce(SLEEP_INTERVAL);
-
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- LOG.info("Sleep interrupted, continuing with aggregation.");
- }
- }
- }
-
- /**
- * Access relaxed for tests
- */
- protected long runOnce(Long SLEEP_INTERVAL) {
- long currentTime = clock.getTime();
- long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime);
- long sleepTime = SLEEP_INTERVAL;
-
- if (lastCheckPointTime != -1) {
- LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
- + ((clock.getTime() - lastCheckPointTime) / 1000)
- + " seconds.");
-
- long startTime = clock.getTime();
- boolean success = doWork(lastCheckPointTime,
- lastCheckPointTime + SLEEP_INTERVAL);
- long executionTime = clock.getTime() - startTime;
- long delta = SLEEP_INTERVAL - executionTime;
-
- if (delta > 0) {
- // Sleep for (configured sleep - time to execute task)
- sleepTime = delta;
- } else {
- // No sleep because last run took too long to execute
- LOG.info("Aggregator execution took too long, " +
- "cancelling sleep. executionTime = " + executionTime);
- sleepTime = 1;
- }
-
- LOG.debug("Aggregator sleep interval = " + sleepTime);
-
- if (success) {
- try {
- // Comment to bug fix:
- // cannot just save lastCheckPointTime + SLEEP_INTERVAL,
- // it has to be verified so it is not a time in the future
- // checkpoint says what was aggregated, and there is no way
- // the future metrics were aggregated!
- saveCheckPoint(Math.min(currentTime, lastCheckPointTime +
- SLEEP_INTERVAL));
- } catch (IOException io) {
- LOG.warn("Error saving checkpoint, restarting aggregation at " +
- "previous checkpoint.");
- }
- }
- }
-
- return sleepTime;
- }
-
- private long readLastCheckpointSavingOnFirstRun(long currentTime) {
- long lastCheckPointTime = -1;
-
- try {
- lastCheckPointTime = readCheckPoint();
- if (isLastCheckPointTooOld(lastCheckPointTime)) {
- LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " +
- "lastCheckPointTime = " + lastCheckPointTime);
- lastCheckPointTime = -1;
- }
- if (lastCheckPointTime == -1) {
- // Assuming first run, save checkpoint and sleep.
- // Set checkpoint to 2 minutes in the past to allow the
- // agents/collectors to catch up
- LOG.info("Saving checkpoint time on first run." +
- (currentTime - checkpointDelayMillis));
- saveCheckPoint(currentTime - checkpointDelayMillis);
- }
- } catch (IOException io) {
- LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io);
- }
- return lastCheckPointTime;
- }
-
- private boolean isLastCheckPointTooOld(long checkpoint) {
- // first checkpoint is saved checkpointDelayMillis in the past,
- // so here we also need to take it into account
- return checkpoint != -1 &&
- ((clock.getTime() - checkpoint - checkpointDelayMillis) >
- getCheckpointCutOffIntervalMillis());
- }
-
- protected long readCheckPoint() {
- try {
- File checkpoint = new File(getCheckpointLocation());
- if (checkpoint.exists()) {
- String contents = FileUtils.readFileToString(checkpoint);
- if (contents != null && !contents.isEmpty()) {
- return Long.parseLong(contents);
- }
- }
- } catch (IOException io) {
- LOG.debug(io);
- }
- return -1;
- }
-
- protected void saveCheckPoint(long checkpointTime) throws IOException {
- File checkpoint = new File(getCheckpointLocation());
- if (!checkpoint.exists()) {
- boolean done = checkpoint.createNewFile();
- if (!done) {
- throw new IOException("Could not create checkpoint at location, " +
- getCheckpointLocation());
- }
- }
- FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime));
- }
-
- /**
- * Read metrics written during the time interval and save the sum and total
- * in the aggregate table.
- *
- * @param startTime Sample start time
- * @param endTime Sample end time
- */
- protected boolean doWork(long startTime, long endTime) {
- LOG.info("Start aggregation cycle @ " + new Date() + ", " +
- "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
-
- boolean success = true;
- PhoenixTransactSQL.Condition condition =
- prepareMetricQueryCondition(startTime, endTime);
-
- Connection conn = null;
- PreparedStatement stmt = null;
- ResultSet rs = null;
-
- try {
- conn = hBaseAccessor.getConnection();
- // FLUME 2. aggregate and ignore the instance
- stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
-
- LOG.debug("Query issued @: " + new Date());
- rs = stmt.executeQuery();
- LOG.debug("Query returned @: " + new Date());
-
- aggregate(rs, startTime, endTime);
- LOG.info("End aggregation cycle @ " + new Date());
-
- } catch (SQLException e) {
- LOG.error("Exception during aggregating metrics.", e);
- success = false;
- } catch (IOException e) {
- LOG.error("Exception during aggregating metrics.", e);
- success = false;
- } finally {
- if (rs != null) {
- try {
- rs.close();
- } catch (SQLException e) {
- // Ignore
- }
- }
- if (stmt != null) {
- try {
- stmt.close();
- } catch (SQLException e) {
- // Ignore
- }
- }
- if (conn != null) {
- try {
- conn.close();
- } catch (SQLException sql) {
- // Ignore
- }
- }
- }
-
- LOG.info("End aggregation cycle @ " + new Date());
- return success;
- }
-
- protected abstract PhoenixTransactSQL.Condition
- prepareMetricQueryCondition(long startTime, long endTime);
-
- protected abstract void aggregate(ResultSet rs, long startTime, long endTime)
- throws IOException, SQLException;
-
- protected abstract Long getSleepIntervalMillis();
-
- protected abstract Integer getCheckpointCutOffMultiplier();
-
- protected Long getCheckpointCutOffIntervalMillis() {
- return getCheckpointCutOffMultiplier() * getSleepIntervalMillis();
- }
-
- protected abstract boolean isDisabled();
-
- protected abstract String getCheckpointLocation();
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AggregatorUtils.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AggregatorUtils.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AggregatorUtils.java
deleted file mode 100644
index fbea248..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AggregatorUtils.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-
-import java.util.Map;
-
-/**
- *
- */
-public class AggregatorUtils {
-
- public static double[] calculateAggregates(Map<Long, Double> metricValues) {
- double[] values = new double[4];
- double max = Double.MIN_VALUE;
- double min = Double.MAX_VALUE;
- double sum = 0.0;
- int metricCount = 0;
-
- if (metricValues != null && !metricValues.isEmpty()) {
- for (Double value : metricValues.values()) {
- // TODO: Some nulls in data - need to investigate null values from host
- if (value != null) {
- if (value > max) {
- max = value;
- }
- if (value < min) {
- min = value;
- }
- sum += value;
- }
- }
- metricCount = metricValues.values().size();
- }
- // BR: WHY ZERO is a good idea?
- values[0] = sum;
- values[1] = max != Double.MIN_VALUE ? max : 0.0;
- values[2] = min != Double.MAX_VALUE ? min : 0.0;
- values[3] = metricCount;
-
- return values;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java
deleted file mode 100644
index 34da78b..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-
-import java.sql.Connection;
-import java.sql.SQLException;
-
-/**
- *
- */
-public interface ConnectionProvider {
- public Connection getConnection() throws SQLException;
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
deleted file mode 100644
index 47db730..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-
-public class DefaultPhoenixDataSource implements ConnectionProvider {
-
- static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class);
- private static final String ZOOKEEPER_CLIENT_PORT =
- "hbase.zookeeper.property.clientPort";
- private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
- private static final String ZNODE_PARENT = "zookeeper.znode.parent";
-
- private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s";
- private final String url;
-
- public DefaultPhoenixDataSource(Configuration hbaseConf) {
- String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT,
- "2181");
- String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM);
- String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/hbase");
- if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) {
- throw new IllegalStateException("Unable to find Zookeeper quorum to " +
- "access HBase store using Phoenix.");
- }
-
- url = String.format(connectionUrl,
- zookeeperQuorum,
- zookeeperClientPort,
- znodeParent);
- }
-
- /**
- * Get JDBC connection to HBase store. Assumption is that the hbase
- * configuration is present on the classpath and loaded by the caller into
- * the Configuration object.
- * Phoenix already caches the HConnection between the client and HBase
- * cluster.
- *
- * @return @java.sql.Connection
- */
- public Connection getConnection() throws SQLException {
-
- LOG.debug("Metric store connection url: " + url);
- try {
- return DriverManager.getConnection(url);
- } catch (SQLException e) {
- LOG.warn("Unable to connect to HBase store using Phoenix.", e);
-
- throw e;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Function.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Function.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Function.java
deleted file mode 100644
index 11245d8..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Function.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-/**
- * Is used to determine metrics aggregate table.
- *
- * @see org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.TimelineWebServices#getTimelineMetric
- * @see org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.TimelineWebServices#getTimelineMetrics
- */
-public class Function {
- public static Function DEFAULT_VALUE_FUNCTION =
- new Function(ReadFunction.VALUE, null);
- private static final String SUFFIX_SEPARATOR = "\\._";
-
- private ReadFunction readFunction = ReadFunction.VALUE;
- private PostProcessingFunction postProcessingFunction = null;
-
- public Function(){
-
- }
-
- public Function(ReadFunction readFunction,
- PostProcessingFunction ppFunction){
- if (readFunction!=null){
- this.readFunction = readFunction ;
- }
- this.postProcessingFunction = ppFunction;
- }
-
- public static Function fromMetricName(String metricName){
- // gets postprocessing, and aggregation function
- // ex. Metric._rate._avg
- String[] parts = metricName.split(SUFFIX_SEPARATOR);
-
- ReadFunction readFunction = ReadFunction.VALUE;
- PostProcessingFunction ppFunction = null;
-
- if (parts.length == 3) {
- ppFunction = PostProcessingFunction.getFunction(parts[1]);
- readFunction = ReadFunction.getFunction(parts[2]);
- } else if (parts.length == 2) {
- ppFunction = null;
- readFunction = ReadFunction.getFunction(parts[1]);
- }
-
-
- return new Function(readFunction, ppFunction);
- }
-
- public String getSuffix(){
- return (postProcessingFunction == null)? readFunction.getSuffix() :
- postProcessingFunction.getSuffix() + readFunction.getSuffix();
- }
-
- public ReadFunction getReadFunction() {
- return readFunction;
- }
-
- @Override
- public String toString() {
- return "Function{" +
- "readFunction=" + readFunction +
- ", postProcessingFunction=" + postProcessingFunction +
- '}';
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (!(o instanceof Function)) return false;
-
- Function function = (Function) o;
-
- return postProcessingFunction == function.postProcessingFunction
- && readFunction == function.readFunction;
-
- }
-
- @Override
- public int hashCode() {
- int result = readFunction.hashCode();
- result = 31 * result + (postProcessingFunction != null ?
- postProcessingFunction.hashCode() : 0);
- return result;
- }
-
- public enum PostProcessingFunction {
- NONE(""),
- RATE("._rate");
-
- PostProcessingFunction(String suffix){
- this.suffix = suffix;
- }
-
- private String suffix = "";
-
- public String getSuffix(){
- return suffix;
- }
-
- public static PostProcessingFunction getFunction(String functionName) throws
- FunctionFormatException {
- if (functionName == null) {
- return NONE;
- }
-
- try {
- return PostProcessingFunction.valueOf(functionName.toUpperCase());
- } catch (IllegalArgumentException e) {
- throw new FunctionFormatException("Function should be value, avg, min, " +
- "max", e);
- }
- }
- }
-
- public enum ReadFunction {
- VALUE(""),
- AVG("._avg"),
- MIN("._min"),
- MAX("._max"),
- SUM("._sum");
-
- private final String suffix;
-
- ReadFunction(String suffix){
- this.suffix = suffix;
- }
-
- public String getSuffix() {
- return suffix;
- }
-
- public static ReadFunction getFunction(String functionName) throws
- FunctionFormatException {
- if (functionName == null) {
- return VALUE;
- }
- try {
- return ReadFunction.valueOf(functionName.toUpperCase());
- } catch (IllegalArgumentException e) {
- throw new FunctionFormatException(
- "Function should be value, avg, min, max. Got " + functionName, e);
- }
- }
- }
-
- public static class FunctionFormatException extends IllegalArgumentException {
- public FunctionFormatException(String message, Throwable cause) {
- super(message, cause);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index a4980b4..1fac404 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -25,6 +25,14 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregatorHourly;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -33,8 +41,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
public class HBaseTimelineMetricStore extends AbstractService
implements TimelineMetricStore {
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
deleted file mode 100644
index 61e15d7..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.codehaus.jackson.annotate.JsonProperty;
-import org.codehaus.jackson.annotate.JsonSubTypes;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import java.io.IOException;
-
-/**
-*
-*/
-@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class),
- @JsonSubTypes.Type(value = MetricHostAggregate.class)})
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class MetricAggregate {
- private static final ObjectMapper mapper = new ObjectMapper();
-
- protected Double sum = 0.0;
- protected Double deviation;
- protected Double max = Double.MIN_VALUE;
- protected Double min = Double.MAX_VALUE;
-
- public MetricAggregate() {
- }
-
- MetricAggregate(Double sum, Double deviation, Double max,
- Double min) {
- this.sum = sum;
- this.deviation = deviation;
- this.max = max;
- this.min = min;
- }
-
- void updateSum(Double sum) {
- this.sum += sum;
- }
-
- void updateMax(Double max) {
- if (max > this.max) {
- this.max = max;
- }
- }
-
- void updateMin(Double min) {
- if (min < this.min) {
- this.min = min;
- }
- }
-
- @JsonProperty("sum")
- Double getSum() {
- return sum;
- }
-
- @JsonProperty("deviation")
- Double getDeviation() {
- return deviation;
- }
-
- @JsonProperty("max")
- Double getMax() {
- return max;
- }
-
- @JsonProperty("min")
- Double getMin() {
- return min;
- }
-
- public void setSum(Double sum) {
- this.sum = sum;
- }
-
- public void setDeviation(Double deviation) {
- this.deviation = deviation;
- }
-
- public void setMax(Double max) {
- this.max = max;
- }
-
- public void setMin(Double min) {
- this.min = min;
- }
-
- public String toJSON() throws IOException {
- return mapper.writeValueAsString(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
deleted file mode 100644
index c13c85f..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
-*
-*/
-public class MetricClusterAggregate extends MetricAggregate {
- private int numberOfHosts;
-
- @JsonCreator
- public MetricClusterAggregate() {
- }
-
- MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
- Double max, Double min) {
- super(sum, deviation, max, min);
- this.numberOfHosts = numberOfHosts;
- }
-
- @JsonProperty("numberOfHosts")
- int getNumberOfHosts() {
- return numberOfHosts;
- }
-
- void updateNumberOfHosts(int count) {
- this.numberOfHosts += count;
- }
-
- public void setNumberOfHosts(int numberOfHosts) {
- this.numberOfHosts = numberOfHosts;
- }
-
- /**
- * Find and update min, max and avg for a minute
- */
- void updateAggregates(MetricClusterAggregate hostAggregate) {
- updateMax(hostAggregate.getMax());
- updateMin(hostAggregate.getMin());
- updateSum(hostAggregate.getSum());
- updateNumberOfHosts(hostAggregate.getNumberOfHosts());
- }
-
- @Override
- public String toString() {
-// MetricClusterAggregate
- return "MetricAggregate{" +
- "sum=" + sum +
- ", numberOfHosts=" + numberOfHosts +
- ", deviation=" + deviation +
- ", max=" + max +
- ", min=" + min +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
deleted file mode 100644
index 02cc207..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
- * Represents a collection of minute based aggregation of values for
- * resolution greater than a minute.
- */
-public class MetricHostAggregate extends MetricAggregate {
-
- private long numberOfSamples = 0;
-
- @JsonCreator
- public MetricHostAggregate() {
- super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
- }
-
- public MetricHostAggregate(Double sum, int numberOfSamples,
- Double deviation,
- Double max, Double min) {
- super(sum, deviation, max, min);
- this.numberOfSamples = numberOfSamples;
- }
-
- @JsonProperty("numberOfSamples")
- long getNumberOfSamples() {
- return numberOfSamples == 0 ? 1 : numberOfSamples;
- }
-
- void updateNumberOfSamples(long count) {
- this.numberOfSamples += count;
- }
-
- public void setNumberOfSamples(long numberOfSamples) {
- this.numberOfSamples = numberOfSamples;
- }
-
- public double getAvg() {
- return sum / numberOfSamples;
- }
-
- /**
- * Find and update min, max and avg for a minute
- */
- void updateAggregates(MetricHostAggregate hostAggregate) {
- updateMax(hostAggregate.getMax());
- updateMin(hostAggregate.getMin());
- updateSum(hostAggregate.getSum());
- updateNumberOfSamples(hostAggregate.getNumberOfSamples());
- }
-
- @Override
- public String toString() {
- return "MetricHostAggregate{" +
- "sum=" + sum +
- ", numberOfSamples=" + numberOfSamples +
- ", deviation=" + deviation +
- ", max=" + max +
- ", min=" + min +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 6a38517..2e78912 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -24,6 +24,17 @@ import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.codehaus.jackson.map.ObjectMapper;
@@ -41,25 +52,6 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.ALTER_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.SplitByMetricNamesCondition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_ENCODING;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_METRICS_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES;
@@ -70,6 +62,23 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
/**
* Provides a facade over the Phoenix API to access HBase schema
@@ -77,7 +86,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
public class PhoenixHBaseAccessor {
private static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class);
- private static final TimelineMetricReader timelineMetricReader = new TimelineMetricReader();
+ private static final TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper();
private final Configuration hbaseConf;
private final Configuration metricsConf;
private final RetryCounterFactory retryCounterFactory;
@@ -151,14 +160,14 @@ public class PhoenixHBaseAccessor {
private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs)
throws SQLException, IOException {
- TimelineMetric metric = timelineMetricReader
+ TimelineMetric metric = TIMELINE_METRIC_READ_HELPER
.getTimelineMetricCommonsFromResultSet(rs);
metric.setMetricValues(readLastMetricValueFromJSON(rs.getString("METRICS")));
return metric;
}
- static TimelineMetric getAggregatedTimelineMetricFromResultSet(
+ public static TimelineMetric getAggregatedTimelineMetricFromResultSet(
ResultSet rs, Function f) throws SQLException, IOException {
TimelineMetric metric = new TimelineMetric();
@@ -214,7 +223,7 @@ public class PhoenixHBaseAccessor {
return (Map<Long, Double>) mapper.readValue(json, metricValuesTypeRef);
}
- static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
+ public static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
throws SQLException, IOException {
TimelineMetric metric = new TimelineMetric();
metric.setMetricName(rs.getString("METRIC_NAME"));
@@ -226,7 +235,7 @@ public class PhoenixHBaseAccessor {
return metric;
}
- static MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs)
+ public static MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs)
throws SQLException {
MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
@@ -238,7 +247,7 @@ public class PhoenixHBaseAccessor {
return metricHostAggregate;
}
- static MetricClusterAggregate getMetricClusterAggregateFromResultSet(ResultSet rs)
+ public static MetricClusterAggregate getMetricClusterAggregateFromResultSet(ResultSet rs)
throws SQLException {
MetricClusterAggregate agg = new MetricClusterAggregate();
agg.setSum(rs.getDouble("METRIC_SUM"));
@@ -474,7 +483,7 @@ public class PhoenixHBaseAccessor {
}
else {
TimelineMetric metric;
- metric = timelineMetricReader.getTimelineMetricFromResultSet(rs);
+ metric = TIMELINE_METRIC_READ_HELPER.getTimelineMetricFromResultSet(rs);
if (condition.isGrouped()) {
metrics.addOrMergeTimelineMetric(metric);
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
deleted file mode 100644
index 2cdefa9..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
+++ /dev/null
@@ -1,970 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Encapsulate all metrics related SQL queries.
- */
-public class PhoenixTransactSQL {
-
- static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
- /**
- * Create table to store individual metric records.
- */
- public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " +
- "EXISTS METRIC_RECORD (METRIC_NAME VARCHAR, " +
- "HOSTNAME VARCHAR, " +
- "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
- "APP_ID VARCHAR, " +
- "INSTANCE_ID VARCHAR, " +
- "START_TIME UNSIGNED_LONG, " +
- "UNITS CHAR(20), " +
- "METRIC_SUM DOUBLE, " +
- "METRIC_COUNT UNSIGNED_INT, " +
- "METRIC_MAX DOUBLE, " +
- "METRIC_MIN DOUBLE, " +
- "METRICS VARCHAR CONSTRAINT pk " +
- "PRIMARY KEY (METRIC_NAME, HOSTNAME, SERVER_TIME, APP_ID, " +
- "INSTANCE_ID)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
- "TTL=%s, COMPRESSION='%s'";
-
- public static final String CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL =
- "CREATE TABLE IF NOT EXISTS METRIC_RECORD_HOURLY " +
- "(METRIC_NAME VARCHAR, " +
- "HOSTNAME VARCHAR, " +
- "APP_ID VARCHAR, " +
- "INSTANCE_ID VARCHAR, " +
- "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
- "UNITS CHAR(20), " +
- "METRIC_SUM DOUBLE," +
- "METRIC_COUNT UNSIGNED_INT, " +
- "METRIC_MAX DOUBLE," +
- "METRIC_MIN DOUBLE CONSTRAINT pk " +
- "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
- "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
- "TTL=%s, COMPRESSION='%s'";
-
- public static final String CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL =
- "CREATE TABLE IF NOT EXISTS METRIC_RECORD_MINUTE " +
- "(METRIC_NAME VARCHAR, " +
- "HOSTNAME VARCHAR, " +
- "APP_ID VARCHAR, " +
- "INSTANCE_ID VARCHAR, " +
- "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
- "UNITS CHAR(20), " +
- "METRIC_SUM DOUBLE," +
- "METRIC_COUNT UNSIGNED_INT, " +
- "METRIC_MAX DOUBLE," +
- "METRIC_MIN DOUBLE CONSTRAINT pk " +
- "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
- "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," +
- " COMPRESSION='%s'";
-
- public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL =
- "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE " +
- "(METRIC_NAME VARCHAR, " +
- "APP_ID VARCHAR, " +
- "INSTANCE_ID VARCHAR, " +
- "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
- "UNITS CHAR(20), " +
- "METRIC_SUM DOUBLE, " +
- "HOSTS_COUNT UNSIGNED_INT, " +
- "METRIC_MAX DOUBLE, " +
- "METRIC_MIN DOUBLE " +
- "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
- "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
- "TTL=%s, COMPRESSION='%s'";
-
- public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL =
- "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE_HOURLY " +
- "(METRIC_NAME VARCHAR, " +
- "APP_ID VARCHAR, " +
- "INSTANCE_ID VARCHAR, " +
- "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
- "UNITS CHAR(20), " +
- "METRIC_SUM DOUBLE, " +
- "METRIC_COUNT UNSIGNED_INT, " +
- "METRIC_MAX DOUBLE, " +
- "METRIC_MIN DOUBLE " +
- "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
- "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
- "TTL=%s, COMPRESSION='%s'";
-
- /**
- * ALTER table to set new options
- */
- public static final String ALTER_SQL = "ALTER TABLE %s SET TTL=%s";
-
- /**
- * Insert into metric records table.
- */
- public static final String UPSERT_METRICS_SQL = "UPSERT INTO %s " +
- "(METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, " +
- "UNITS, " +
- "METRIC_SUM, " +
- "METRIC_MAX, " +
- "METRIC_MIN, " +
- "METRIC_COUNT, " +
- "METRICS) VALUES " +
- "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
-
- public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " +
- "METRIC_AGGREGATE (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
- "UNITS, " +
- "METRIC_SUM, " +
- "HOSTS_COUNT, " +
- "METRIC_MAX, " +
- "METRIC_MIN) " +
- "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
-
- public static final String UPSERT_CLUSTER_AGGREGATE_TIME_SQL = "UPSERT INTO" +
- " %s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
- "UNITS, " +
- "METRIC_SUM, " +
- "METRIC_COUNT, " +
- "METRIC_MAX, " +
- "METRIC_MIN) " +
- "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
-
-
- public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " +
- "%s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
- "SERVER_TIME, " +
- "UNITS, " +
- "METRIC_SUM, " +
- "METRIC_MAX, " +
- "METRIC_MIN," +
- "METRIC_COUNT) " +
- "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
-
- /**
- * Retrieve a set of rows from metrics records table.
- */
- public static final String GET_METRIC_SQL = "SELECT %s METRIC_NAME, " +
- "HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, UNITS, " +
- "METRIC_SUM, " +
- "METRIC_MAX, " +
- "METRIC_MIN, " +
- "METRIC_COUNT, " +
- "METRICS " +
- "FROM %s";
-
- public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT %s " +
- "METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
- "UNITS, " +
- "METRIC_SUM, " +
- "METRIC_MAX, " +
- "METRIC_MIN, " +
- "METRIC_COUNT " +
- "FROM %s";
-
- public static final String GET_CLUSTER_AGGREGATE_SQL = "SELECT %s " +
- "METRIC_NAME, APP_ID, " +
- "INSTANCE_ID, SERVER_TIME, " +
- "UNITS, " +
- "METRIC_SUM, " +
- "HOSTS_COUNT, " +
- "METRIC_MAX, " +
- "METRIC_MIN " +
- "FROM %s";
-
- public static final String GET_CLUSTER_AGGREGATE_HOURLY_SQL = "SELECT %s " +
- "METRIC_NAME, APP_ID, " +
- "INSTANCE_ID, SERVER_TIME, " +
- "UNITS, " +
- "METRIC_SUM, " +
- "METRIC_COUNT, " +
- "METRIC_MAX, " +
- "METRIC_MIN " +
- "FROM %s";
-
- public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";
- public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME =
- "METRIC_RECORD_MINUTE";
- public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME =
- "METRIC_RECORD_HOURLY";
- public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME =
- "METRIC_AGGREGATE";
- public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME =
- "METRIC_AGGREGATE_HOURLY";
- public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY";
- public static final String DEFAULT_ENCODING = "FAST_DIFF";
- public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes
- public static final long HOUR = 3600000; // 1 hour
- public static final long DAY = 86400000; // 1 day
-
- /**
- * Filter to optimize HBase scan by using file timestamps. This prevents
- * a full table scan of metric records.
- *
- * @return Phoenix Hint String
- */
- public static String getNaiveTimeRangeHint(Long startTime, Long delta) {
- return String.format("/*+ NATIVE_TIME_RANGE(%s) */", (startTime - delta));
- }
-
- public static PreparedStatement prepareGetMetricsSqlStmt(
- Connection connection, Condition condition) throws SQLException {
-
- validateConditionIsNotEmpty(condition);
- validateRowCountLimit(condition);
-
- String stmtStr;
- if (condition.getStatement() != null) {
- stmtStr = condition.getStatement();
- } else {
-
- String metricsTable;
- String query;
- if (condition.getPrecision() == null) {
- long endTime = condition.getEndTime() == null ? System.currentTimeMillis() : condition.getEndTime();
- long startTime = condition.getStartTime() == null ? 0 : condition.getStartTime();
- Long timeRange = endTime - startTime;
- if (timeRange > 5 * DAY) {
- metricsTable = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
- query = GET_METRIC_AGGREGATE_ONLY_SQL;
- condition.setPrecision(Precision.HOURS);
- } else if (timeRange > 10 * HOUR) {
- metricsTable = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
- query = GET_METRIC_AGGREGATE_ONLY_SQL;
- condition.setPrecision(Precision.MINUTES);
- } else {
- metricsTable = METRICS_RECORD_TABLE_NAME;
- query = GET_METRIC_SQL;
- condition.setPrecision(Precision.SECONDS);
- }
- } else {
- switch (condition.getPrecision()) {
- case HOURS:
- metricsTable = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
- query = GET_METRIC_AGGREGATE_ONLY_SQL;
- break;
- case MINUTES:
- metricsTable = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
- query = GET_METRIC_AGGREGATE_ONLY_SQL;
- break;
- default:
- metricsTable = METRICS_RECORD_TABLE_NAME;
- query = GET_METRIC_SQL;
- }
- }
-
- stmtStr = String.format(query,
- getNaiveTimeRangeHint(condition.getStartTime(), NATIVE_TIME_RANGE_DELTA),
- metricsTable);
- }
-
- StringBuilder sb = new StringBuilder(stmtStr);
- sb.append(" WHERE ");
- sb.append(condition.getConditionClause());
- String orderByClause = condition.getOrderByClause(true);
-
- if (orderByClause != null) {
- sb.append(orderByClause);
- } else {
- sb.append(" ORDER BY METRIC_NAME, SERVER_TIME ");
- }
- if (condition.getLimit() != null) {
- sb.append(" LIMIT ").append(condition.getLimit());
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
- }
- PreparedStatement stmt = connection.prepareStatement(sb.toString());
- int pos = 1;
- if (condition.getMetricNames() != null) {
- for (; pos <= condition.getMetricNames().size(); pos++) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1));
- }
- stmt.setString(pos, condition.getMetricNames().get(pos - 1));
- }
- }
- if (condition.getHostname() != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname());
- }
- stmt.setString(pos++, condition.getHostname());
- }
- if (condition.getAppId() != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
- }
- stmt.setString(pos++, condition.getAppId());
- }
- if (condition.getInstanceId() != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId());
- }
- stmt.setString(pos++, condition.getInstanceId());
- }
- if (condition.getStartTime() != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime());
- }
- stmt.setLong(pos++, condition.getStartTime());
- }
- if (condition.getEndTime() != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime());
- }
- stmt.setLong(pos, condition.getEndTime());
- }
- if (condition.getFetchSize() != null) {
- stmt.setFetchSize(condition.getFetchSize());
- }
-
- return stmt;
- }
-
- private static void validateConditionIsNotEmpty(Condition condition) {
- if (condition.isEmpty()) {
- throw new IllegalArgumentException("Condition is empty.");
- }
- }
-
- private static void validateRowCountLimit(Condition condition) {
- if (condition.getMetricNames() == null
- || condition.getMetricNames().size() ==0 ) {
- //aggregator can use empty metrics query
- return;
- }
-
- long range = condition.getEndTime() - condition.getStartTime();
- long rowsPerMetric = TimeUnit.MILLISECONDS.toHours(range) + 1;
-
- Precision precision = condition.getPrecision();
- // for minutes and seconds we can use the rowsPerMetric computed based on
- // minutes
- if (precision != null && precision == Precision.HOURS) {
- rowsPerMetric = TimeUnit.MILLISECONDS.toHours(range) + 1;
- }
-
- long totalRowsRequested = rowsPerMetric * condition.getMetricNames().size();
- if (totalRowsRequested > PhoenixHBaseAccessor.RESULTSET_LIMIT) {
- throw new IllegalArgumentException("The time range query for " +
- "precision table exceeds row count limit, please query aggregate " +
- "table instead.");
- }
- }
-
- public static PreparedStatement prepareGetLatestMetricSqlStmt(
- Connection connection, Condition condition) throws SQLException {
-
- validateConditionIsNotEmpty(condition);
-
- if (condition.getMetricNames() == null
- || condition.getMetricNames().size() == 0) {
- throw new IllegalArgumentException("Point in time query without " +
- "metric names not supported ");
- }
-
- String stmtStr;
- if (condition.getStatement() != null) {
- stmtStr = condition.getStatement();
- } else {
- stmtStr = String.format(GET_METRIC_SQL,
- "",
- METRICS_RECORD_TABLE_NAME);
- }
-
- StringBuilder sb = new StringBuilder(stmtStr);
- sb.append(" WHERE ");
- sb.append(condition.getConditionClause());
- String orderByClause = condition.getOrderByClause(false);
- if (orderByClause != null) {
- sb.append(orderByClause);
- } else {
- sb.append(" ORDER BY METRIC_NAME DESC, HOSTNAME DESC, SERVER_TIME DESC ");
- }
-
- sb.append(" LIMIT ").append(condition.getMetricNames().size());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
- }
- PreparedStatement stmt = connection.prepareStatement(sb.toString());
- int pos = 1;
- if (condition.getMetricNames() != null) {
- //IGNORE condition limit, set one based on number of metric names
- for (; pos <= condition.getMetricNames().size(); pos++) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1));
- }
- stmt.setString(pos, condition.getMetricNames().get(pos - 1));
- }
- }
- if (condition.getHostname() != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname());
- }
- stmt.setString(pos++, condition.getHostname());
- }
- if (condition.getAppId() != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
- }
- stmt.setString(pos++, condition.getAppId());
- }
- if (condition.getInstanceId() != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId());
- }
- stmt.setString(pos++, condition.getInstanceId());
- }
-
- if (condition.getFetchSize() != null) {
- stmt.setFetchSize(condition.getFetchSize());
- }
-
- return stmt;
- }
-
- public static PreparedStatement prepareGetAggregateSqlStmt(
- Connection connection, Condition condition) throws SQLException {
-
- validateConditionIsNotEmpty(condition);
-
- String metricsAggregateTable;
- String queryStmt;
- if (condition.getPrecision() == null) {
- long endTime = condition.getEndTime() == null ? System.currentTimeMillis() : condition.getEndTime();
- long startTime = condition.getStartTime() == null ? 0 : condition.getStartTime();
- Long timeRange = endTime - startTime;
- if (timeRange > 5 * DAY) {
- metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
- queryStmt = GET_CLUSTER_AGGREGATE_HOURLY_SQL;
- condition.setPrecision(Precision.HOURS);
- } else {
- metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
- queryStmt = GET_CLUSTER_AGGREGATE_SQL;
- condition.setPrecision(Precision.SECONDS);
- }
- } else {
- switch (condition.getPrecision()) {
- case HOURS:
- metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
- queryStmt = GET_CLUSTER_AGGREGATE_HOURLY_SQL;
- break;
- default:
- metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
- queryStmt = GET_CLUSTER_AGGREGATE_SQL;
- }
- }
-
- StringBuilder sb = new StringBuilder(queryStmt);
- sb.append(" WHERE ");
- sb.append(condition.getConditionClause());
- sb.append(" ORDER BY METRIC_NAME, SERVER_TIME");
- if (condition.getLimit() != null) {
- sb.append(" LIMIT ").append(condition.getLimit());
- }
-
- String query = String.format(sb.toString(),
- PhoenixTransactSQL.getNaiveTimeRangeHint(condition.getStartTime(),
- NATIVE_TIME_RANGE_DELTA), metricsAggregateTable);
- if (LOG.isDebugEnabled()) {
- LOG.debug("SQL => " + query + ", condition => " + condition);
- }
- PreparedStatement stmt = connection.prepareStatement(query);
- int pos = 1;
- if (condition.getMetricNames() != null) {
- for (; pos <= condition.getMetricNames().size(); pos++) {
- stmt.setString(pos, condition.getMetricNames().get(pos - 1));
- }
- }
- // TODO: Upper case all strings on POST
- if (condition.getAppId() != null) {
- stmt.setString(pos++, condition.getAppId());
- }
- if (condition.getInstanceId() != null) {
- stmt.setString(pos++, condition.getInstanceId());
- }
- if (condition.getStartTime() != null) {
- stmt.setLong(pos++, condition.getStartTime());
- }
- if (condition.getEndTime() != null) {
- stmt.setLong(pos, condition.getEndTime());
- }
-
- return stmt;
- }
-
- public static PreparedStatement prepareGetLatestAggregateMetricSqlStmt(
- Connection connection, Condition condition) throws SQLException {
-
- validateConditionIsNotEmpty(condition);
-
- String stmtStr;
- if (condition.getStatement() != null) {
- stmtStr = condition.getStatement();
- } else {
- stmtStr = String.format(GET_CLUSTER_AGGREGATE_SQL, "",
- METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
- }
-
- StringBuilder sb = new StringBuilder(stmtStr);
- sb.append(" WHERE ");
- sb.append(condition.getConditionClause());
- String orderByClause = condition.getOrderByClause(false);
- if (orderByClause != null) {
- sb.append(orderByClause);
- } else {
- sb.append(" ORDER BY METRIC_NAME DESC, SERVER_TIME DESC ");
- }
-
- sb.append(" LIMIT ").append(condition.getMetricNames().size());
-
- String query = sb.toString();
- if (LOG.isDebugEnabled()) {
- LOG.debug("SQL: " + query + ", condition: " + condition);
- }
-
- PreparedStatement stmt = connection.prepareStatement(query);
- int pos = 1;
- if (condition.getMetricNames() != null) {
- for (; pos <= condition.getMetricNames().size(); pos++) {
- stmt.setString(pos, condition.getMetricNames().get(pos - 1));
- }
- }
- if (condition.getAppId() != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
- }
- stmt.setString(pos++, condition.getAppId());
- }
- if (condition.getInstanceId() != null) {
- stmt.setString(pos++, condition.getInstanceId());
- }
-
- return stmt;
- }
-
- static interface Condition {
-
- boolean isEmpty();
-
- List<String> getMetricNames();
- boolean isPointInTime();
- boolean isGrouped();
- void setStatement(String statement);
- String getHostname();
- Precision getPrecision();
- void setPrecision(Precision precision);
- String getAppId();
- String getInstanceId();
- StringBuilder getConditionClause();
- String getOrderByClause(boolean asc);
- String getStatement();
- Long getStartTime();
- Long getEndTime();
- Integer getLimit();
- Integer getFetchSize();
- void setFetchSize(Integer fetchSize);
- void addOrderByColumn(String column);
- void setNoLimit();
- }
-
- static class DefaultCondition implements Condition {
- List<String> metricNames;
- String hostname;
- String appId;
- String instanceId;
- Long startTime;
- Long endTime;
- Precision precision;
- Integer limit;
- boolean grouped;
- boolean noLimit = false;
- Integer fetchSize;
- String statement;
- Set<String> orderByColumns = new LinkedHashSet<String>();
-
- DefaultCondition(List<String> metricNames, String hostname, String appId,
- String instanceId, Long startTime, Long endTime, Precision precision,
- Integer limit, boolean grouped) {
- this.metricNames = metricNames;
- this.hostname = hostname;
- this.appId = appId;
- this.instanceId = instanceId;
- this.startTime = startTime;
- this.endTime = endTime;
- this.precision = precision;
- this.limit = limit;
- this.grouped = grouped;
- }
-
- public String getStatement() {
- return statement;
- }
-
- public void setStatement(String statement) {
- this.statement = statement;
- }
-
- public List<String> getMetricNames() {
- return metricNames == null || metricNames.isEmpty() ? null : metricNames;
- }
-
- public StringBuilder getConditionClause() {
- StringBuilder sb = new StringBuilder();
- boolean appendConjunction = false;
- StringBuilder metricsLike = new StringBuilder();
- StringBuilder metricsIn = new StringBuilder();
-
- if (getMetricNames() != null) {
- for (String name : getMetricNames()) {
- if (name.contains("%")) {
- if (metricsLike.length() > 1) {
- metricsLike.append(" OR ");
- }
- metricsLike.append("METRIC_NAME LIKE ?");
- } else {
- if (metricsIn.length() > 0) {
- metricsIn.append(", ");
- }
- metricsIn.append("?");
- }
- }
-
- if (metricsIn.length()>0) {
- sb.append("(METRIC_NAME IN (");
- sb.append(metricsIn);
- sb.append(")");
- appendConjunction = true;
- }
-
- if (metricsLike.length() > 0) {
- if (appendConjunction) {
- sb.append(" OR ");
- } else {
- sb.append("(");
- }
- sb.append(metricsLike);
- appendConjunction = true;
- }
-
- if (appendConjunction) {
- sb.append(")");
- }
- }
-
- appendConjunction = append(sb, appendConjunction, getHostname(), " HOSTNAME = ?");
- appendConjunction = append(sb, appendConjunction, getAppId(), " APP_ID = ?");
- appendConjunction = append(sb, appendConjunction, getInstanceId(), " INSTANCE_ID = ?");
- appendConjunction = append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?");
- append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?");
-
- return sb;
- }
-
- protected static boolean append(StringBuilder sb,
- boolean appendConjunction,
- Object value, String str) {
- if (value != null) {
- if (appendConjunction) {
- sb.append(" AND");
- }
-
- sb.append(str);
- appendConjunction = true;
- }
- return appendConjunction;
- }
-
- public String getHostname() {
- return hostname == null || hostname.isEmpty() ? null : hostname;
- }
-
- public Precision getPrecision() {
- return precision;
- }
-
- public void setPrecision(Precision precision) {
- this.precision = precision;
- }
-
- public String getAppId() {
- if (appId != null && !appId.isEmpty()) {
- if (!(appId.equals("HOST") || appId.equals("FLUME_HANDLER")) ) {
- return appId.toLowerCase();
- } else {
- return appId;
- }
- }
- return null;
- }
-
- public String getInstanceId() {
- return instanceId == null || instanceId.isEmpty() ? null : instanceId;
- }
-
- /**
- * Convert to millis.
- */
- public Long getStartTime() {
- if (startTime == null) {
- return null;
- } else if (startTime < 9999999999l) {
- return startTime * 1000;
- } else {
- return startTime;
- }
- }
-
- public Long getEndTime() {
- if (endTime == null) {
- return null;
- }
- if (endTime < 9999999999l) {
- return endTime * 1000;
- } else {
- return endTime;
- }
- }
-
- public void setNoLimit() {
- this.noLimit = true;
- }
-
- public Integer getLimit() {
- if (noLimit) {
- return null;
- }
- return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit;
- }
-
- public boolean isGrouped() {
- return grouped;
- }
-
- public boolean isPointInTime() {
- return getStartTime() == null && getEndTime() == null;
- }
-
- public boolean isEmpty() {
- return (metricNames == null || metricNames.isEmpty())
- && (hostname == null || hostname.isEmpty())
- && (appId == null || appId.isEmpty())
- && (instanceId == null || instanceId.isEmpty())
- && startTime == null
- && endTime == null;
- }
-
- public Integer getFetchSize() {
- return fetchSize;
- }
-
- public void setFetchSize(Integer fetchSize) {
- this.fetchSize = fetchSize;
- }
-
- public void addOrderByColumn(String column) {
- orderByColumns.add(column);
- }
-
- public String getOrderByClause(boolean asc) {
- String orderByStr = " ORDER BY ";
- if (!orderByColumns.isEmpty()) {
- StringBuilder sb = new StringBuilder(orderByStr);
- for (String orderByColumn : orderByColumns) {
- if (sb.length() != orderByStr.length()) {
- sb.append(", ");
- }
- sb.append(orderByColumn);
- if (!asc) {
- sb.append(" DESC");
- }
- }
- sb.append(" ");
- return sb.toString();
- }
- return null;
- }
-
- @Override
- public String toString() {
- return "Condition{" +
- "metricNames=" + metricNames +
- ", hostname='" + hostname + '\'' +
- ", appId='" + appId + '\'' +
- ", instanceId='" + instanceId + '\'' +
- ", startTime=" + startTime +
- ", endTime=" + endTime +
- ", limit=" + limit +
- ", grouped=" + grouped +
- ", orderBy=" + orderByColumns +
- ", noLimit=" + noLimit +
- '}';
- }
- }
-
- static class SplitByMetricNamesCondition implements Condition {
- private final Condition adaptee;
- private String currentMetric;
-
- SplitByMetricNamesCondition(Condition condition){
- this.adaptee = condition;
- }
-
- @Override
- public boolean isEmpty() {
- return adaptee.isEmpty();
- }
-
- @Override
- public List<String> getMetricNames() {
- return Collections.singletonList(currentMetric);
- }
-
- @Override
- public boolean isPointInTime() {
- return adaptee.isPointInTime();
- }
-
- @Override
- public boolean isGrouped() {
- return adaptee.isGrouped();
- }
-
- @Override
- public void setStatement(String statement) {
- adaptee.setStatement(statement);
- }
-
- @Override
- public String getHostname() {
- return adaptee.getHostname();
- }
-
- @Override
- public Precision getPrecision() {
- return adaptee.getPrecision();
- }
-
- @Override
- public void setPrecision(Precision precision) {
- adaptee.setPrecision(precision);
- }
-
- @Override
- public String getAppId() {
- return adaptee.getAppId();
- }
-
- @Override
- public String getInstanceId() {
- return adaptee.getInstanceId();
- }
-
- @Override
- public StringBuilder getConditionClause() {
- StringBuilder sb = new StringBuilder();
- boolean appendConjunction = false;
-
- if (getMetricNames() != null) {
- for (String name : getMetricNames()) {
- if (sb.length() > 1) {
- sb.append(" OR ");
- }
- sb.append("METRIC_NAME = ?");
- }
-
- appendConjunction = true;
- }
-
- appendConjunction = DefaultCondition.append(sb, appendConjunction,
- getHostname(), " HOSTNAME = ?");
- appendConjunction = DefaultCondition.append(sb, appendConjunction,
- getAppId(), " APP_ID = ?");
- appendConjunction = DefaultCondition.append(sb, appendConjunction,
- getInstanceId(), " INSTANCE_ID = ?");
- appendConjunction = DefaultCondition.append(sb, appendConjunction,
- getStartTime(), " SERVER_TIME >= ?");
- DefaultCondition.append(sb, appendConjunction, getEndTime(),
- " SERVER_TIME < ?");
-
- return sb;
- }
-
- @Override
- public String getOrderByClause(boolean asc) {
- return adaptee.getOrderByClause(asc);
- }
-
- @Override
- public String getStatement() {
- return adaptee.getStatement();
- }
-
- @Override
- public Long getStartTime() {
- return adaptee.getStartTime();
- }
-
- @Override
- public Long getEndTime() {
- return adaptee.getEndTime();
- }
-
- @Override
- public Integer getLimit() {
- return adaptee.getLimit();
- }
-
- @Override
- public Integer getFetchSize() {
- return adaptee.getFetchSize();
- }
-
- @Override
- public void setFetchSize(Integer fetchSize) {
- adaptee.setFetchSize(fetchSize);
- }
-
- @Override
- public void addOrderByColumn(String column) {
- adaptee.addOrderByColumn(column);
- }
-
- @Override
- public void setNoLimit() {
- adaptee.setNoLimit();
- }
-
- public List<String> getOriginalMetricNames() {
- return adaptee.getMetricNames();
- }
-
- public void setCurrentMetric(String currentMetric) {
- this.currentMetric = currentMetric;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
deleted file mode 100644
index d227993..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-public class TimelineClusterMetric {
- private String metricName;
- private String appId;
- private String instanceId;
- private long timestamp;
- private String type;
-
- TimelineClusterMetric(String metricName, String appId, String instanceId,
- long timestamp, String type) {
- this.metricName = metricName;
- this.appId = appId;
- this.instanceId = instanceId;
- this.timestamp = timestamp;
- this.type = type;
- }
-
- String getMetricName() {
- return metricName;
- }
-
- String getAppId() {
- return appId;
- }
-
- String getInstanceId() {
- return instanceId;
- }
-
- long getTimestamp() {
- return timestamp;
- }
-
- String getType() { return type; }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- TimelineClusterMetric that = (TimelineClusterMetric) o;
-
- if (timestamp != that.timestamp) return false;
- if (appId != null ? !appId.equals(that.appId) : that.appId != null)
- return false;
- if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null)
- return false;
- if (!metricName.equals(that.metricName)) return false;
-
- return true;
- }
-
- public boolean equalsExceptTime(TimelineClusterMetric metric) {
- if (!metricName.equals(metric.metricName)) return false;
- if (!appId.equals(metric.appId)) return false;
- if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
- return false;
-
- return true;
- }
- @Override
- public int hashCode() {
- int result = metricName.hashCode();
- result = 31 * result + (appId != null ? appId.hashCode() : 0);
- result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
- result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
- return result;
- }
-
- @Override
- public String toString() {
- return "TimelineClusterMetric{" +
- "metricName='" + metricName + '\'' +
- ", appId='" + appId + '\'' +
- ", instanceId='" + instanceId + '\'' +
- ", timestamp=" + timestamp +
- '}';
- }
-}