You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2016/02/24 23:27:07 UTC
ambari git commit: AMBARI-15170. Move logic involving HBase shell
calls to enable normalization and FIFO compaction policy to Java. (swagle)
Repository: ambari
Updated Branches:
refs/heads/trunk 9bad06c0f -> 22ec03b69
AMBARI-15170. Move logic involving HBase shell calls to enable normalization and FIFO compaction policy to Java. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/22ec03b6
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/22ec03b6
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/22ec03b6
Branch: refs/heads/trunk
Commit: 22ec03b69762a03691b6c76b7cbccaa2cc571852
Parents: 9bad06c
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Wed Feb 24 14:26:35 2016 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Wed Feb 24 14:26:45 2016 -0800
----------------------------------------------------------------------
.../conf/unix/ambari-metrics-collector | 9 +-
.../timeline/HBaseTimelineMetricStore.java | 2 +
.../metrics/timeline/PhoenixHBaseAccessor.java | 100 ++++++++++++++++++-
.../query/DefaultPhoenixDataSource.java | 23 ++++-
.../query/PhoenixConnectionProvider.java | 31 ++++++
.../timeline/query/PhoenixTransactSQL.java | 19 +++-
.../timeline/AbstractMiniHBaseClusterTest.java | 53 ++++++----
.../timeline/ITPhoenixHBaseAccessor.java | 52 +++++++++-
.../timeline/PhoenixHBaseAccessorTest.java | 23 ++++-
9 files changed, 273 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/22ec03b6/ambari-metrics/ambari-metrics-timelineservice/conf/unix/ambari-metrics-collector
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/conf/unix/ambari-metrics-collector b/ambari-metrics/ambari-metrics-timelineservice/conf/unix/ambari-metrics-collector
index e319d73..a08c7b6 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/conf/unix/ambari-metrics-collector
+++ b/ambari-metrics/ambari-metrics-timelineservice/conf/unix/ambari-metrics-collector
@@ -304,8 +304,6 @@ function start()
if [ $? -ne 0 ]; then
echo "WARNING: Ambari Metrics data model initialization failed."
>&2 echo "WARNING: Ambari Metrics data model initialization failed."
- else
- enable_normalization_fifo
fi
}
@@ -397,7 +395,7 @@ case "$1" in
start)
start
- ;;
+ ;;
stop)
stop
@@ -414,6 +412,9 @@ case "$1" in
restart)
stop
start
- ;;
+ ;;
+ enable_normalization_fifo)
+ enable_normalization_fifo
+ ;;
esac
http://git-wip-us.apache.org/repos/asf/ambari/blob/22ec03b6/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index b5ec6e8..37e4796 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -83,6 +83,8 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor, metricsConf);
metricMetadataManager.initializeMetadata();
+ hBaseAccessor.initPolicies();
+
if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/22ec03b6/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 1c86ebb..09da6bf 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -22,6 +22,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
@@ -38,8 +40,8 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
@@ -101,6 +103,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
@@ -134,18 +137,27 @@ public class PhoenixHBaseAccessor {
private final Configuration hbaseConf;
private final Configuration metricsConf;
private final RetryCounterFactory retryCounterFactory;
- private final ConnectionProvider dataSource;
+ private final PhoenixConnectionProvider dataSource;
private final long outOfBandTimeAllowance;
private final boolean skipBlockCacheForAggregatorsEnabled;
+ static final String HSTORE_COMPACTION_CLASS_KEY =
+ "hbase.hstore.defaultengine.compactionpolicy.class";
+ static final String FIFO_COMPACTION_POLICY_CLASS =
+ "org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy";
+ static final String DEFAULT_COMPACTION_POLICY_CLASS =
+ "org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy";
+ static final String BLOCKING_STORE_FILES_KEY =
+ "hbase.hstore.blockingStoreFiles";
+
public PhoenixHBaseAccessor(Configuration hbaseConf,
Configuration metricsConf){
this(hbaseConf, metricsConf, new DefaultPhoenixDataSource(hbaseConf));
}
PhoenixHBaseAccessor(Configuration hbaseConf,
- Configuration metricsConf,
- ConnectionProvider dataSource) {
+ Configuration metricsConf,
+ PhoenixConnectionProvider dataSource) {
this.hbaseConf = hbaseConf;
this.metricsConf = metricsConf;
RESULTSET_LIMIT = metricsConf.getInt(GLOBAL_RESULT_LIMIT, RESULTSET_LIMIT);
@@ -215,6 +227,15 @@ public class PhoenixHBaseAccessor {
return dataSource.getConnection();
}
+ /**
+ * Unit test purpose only for now.
+ * @return @HBaseAdmin
+ * @throws IOException
+ */
+ HBaseAdmin getHBaseAdmin() throws IOException {
+ return dataSource.getHBaseAdmin();
+ }
+
protected void initMetricSchema() {
Connection conn = null;
Statement stmt = null;
@@ -333,6 +354,77 @@ public class PhoenixHBaseAccessor {
}
}
+ protected void initPolicies() {
+ boolean enableNormalizer = hbaseConf.getBoolean("hbase.normalizer.enabled", true);
+ boolean enableFifoCompaction = metricsConf.getBoolean("timeline.metrics.hbase.fifo.compaction.enabled", true);
+
+ if (!enableNormalizer && !enableFifoCompaction) {
+ return;
+ }
+
+ HBaseAdmin hBaseAdmin = null;
+ try {
+ hBaseAdmin = dataSource.getHBaseAdmin();
+ } catch (IOException e) {
+ LOG.warn("Unable to initialize HBaseAdmin for setting policies.", e);
+ }
+
+ if (hBaseAdmin != null) {
+ for (String tableName : PHOENIX_TABLES) {
+ try {
+ boolean modifyTable = false;
+ HTableDescriptor tableDescriptor = hBaseAdmin.getTableDescriptor(tableName.getBytes());
+
+ if (enableNormalizer &&
+ !tableDescriptor.isNormalizationEnabled()) {
+ tableDescriptor.setNormalizationEnabled(true);
+ LOG.info("Enabling normalizer for " + tableName);
+ modifyTable = true;
+ }
+
+ Map<String, String> config = tableDescriptor.getConfiguration();
+ if (enableFifoCompaction &&
+ !FIFO_COMPACTION_POLICY_CLASS.equals(config.get(HSTORE_COMPACTION_CLASS_KEY))) {
+ tableDescriptor.setConfiguration(HSTORE_COMPACTION_CLASS_KEY,
+ FIFO_COMPACTION_POLICY_CLASS);
+ LOG.info("Setting config property " + HSTORE_COMPACTION_CLASS_KEY +
+ " = " + FIFO_COMPACTION_POLICY_CLASS + " for " + tableName);
+ // Need to set blockingStoreFiles to 1000 for FIFO
+ tableDescriptor.setConfiguration(BLOCKING_STORE_FILES_KEY, "1000");
+ LOG.info("Setting config property " + BLOCKING_STORE_FILES_KEY +
+ " = " + 1000 + " for " + tableName);
+ modifyTable = true;
+ }
+ // Set back original policy if fifo disabled
+ if (!enableFifoCompaction &&
+ FIFO_COMPACTION_POLICY_CLASS.equals(config.get(HSTORE_COMPACTION_CLASS_KEY))) {
+ tableDescriptor.setConfiguration(HSTORE_COMPACTION_CLASS_KEY,
+ DEFAULT_COMPACTION_POLICY_CLASS);
+ LOG.info("Setting config property " + HSTORE_COMPACTION_CLASS_KEY +
+ " = " + DEFAULT_COMPACTION_POLICY_CLASS + " for " + tableName);
+ tableDescriptor.setConfiguration(BLOCKING_STORE_FILES_KEY, "300");
+ LOG.info("Setting config property " + BLOCKING_STORE_FILES_KEY +
+ " = " + 300 + " for " + tableName);
+ modifyTable = true;
+ }
+
+ // Persist only if anything changed
+ if (modifyTable) {
+ hBaseAdmin.modifyTable(tableName.getBytes(), tableDescriptor);
+ }
+
+ } catch (IOException e) {
+ LOG.error("Failed setting policies for " + tableName, e);
+ }
+ }
+ try {
+ hBaseAdmin.close();
+ } catch (IOException e) {
+ LOG.warn("Exception on HBaseAdmin close.", e);
+ }
+ }
+ }
+
protected String getSplitPointsStr(String splitPoints) {
if (StringUtils.isEmpty(splitPoints.trim())) {
return "";
http://git-wip-us.apache.org/repos/asf/ambari/blob/22ec03b6/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
index 8283f7d..0ea136b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
@@ -21,25 +21,29 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
-public class DefaultPhoenixDataSource implements ConnectionProvider {
+public class DefaultPhoenixDataSource implements PhoenixConnectionProvider {
static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class);
- private static final String ZOOKEEPER_CLIENT_PORT =
- "hbase.zookeeper.property.clientPort";
+ private static final String ZOOKEEPER_CLIENT_PORT ="hbase.zookeeper.property.clientPort";
private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
private static final String ZNODE_PARENT = "zookeeper.znode.parent";
private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s";
private final String url;
+ private Configuration hbaseConf;
+
public DefaultPhoenixDataSource(Configuration hbaseConf) {
- String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT,
- "2181");
+ this.hbaseConf = hbaseConf;
+ String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT, "2181");
String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM);
String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/ams-hbase-unsecure");
if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) {
@@ -54,6 +58,15 @@ public class DefaultPhoenixDataSource implements ConnectionProvider {
}
/**
+ * Get HBaseAdmin for table ops.
+ * @return @HBaseAdmin
+ * @throws IOException
+ */
+ public HBaseAdmin getHBaseAdmin() throws IOException {
+ return (HBaseAdmin) ConnectionFactory.createConnection(hbaseConf).getAdmin();
+ }
+
+ /**
* Get JDBC connection to HBase store. Assumption is that the hbase
* configuration is present on the classpath and loaded by the caller into
* the Configuration object.
http://git-wip-us.apache.org/repos/asf/ambari/blob/22ec03b6/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java
new file mode 100644
index 0000000..cacbcfb
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java
@@ -0,0 +1,31 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+
+import java.io.IOException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface PhoenixConnectionProvider extends ConnectionProvider {
+ /**
+ * Get HBaseAdmin for the Phoenix connection
+ * @return
+ * @throws IOException
+ */
+ HBaseAdmin getHBaseAdmin() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/22ec03b6/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
index cd1bfb3..89e8c34 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
@@ -19,15 +19,16 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.metrics2.sink.timeline.Precision;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
-import java.util.Collection;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
@@ -289,6 +290,18 @@ public class PhoenixTransactSQL {
"METRIC_AGGREGATE_HOURLY";
public static final String METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME =
"METRIC_AGGREGATE_DAILY";
+
+ public static final String[] PHOENIX_TABLES = {
+ METRICS_RECORD_TABLE_NAME,
+ METRICS_AGGREGATE_MINUTE_TABLE_NAME,
+ METRICS_AGGREGATE_HOURLY_TABLE_NAME,
+ METRICS_AGGREGATE_DAILY_TABLE_NAME,
+ METRICS_CLUSTER_AGGREGATE_TABLE_NAME,
+ METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME,
+ METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME,
+ METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME
+ };
+
public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY";
public static final String DEFAULT_ENCODING = "FAST_DIFF";
public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes
http://git-wip-us.apache.org/repos/asf/ambari/blob/22ec03b6/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
index df4fc89..7410e9d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
@@ -17,13 +17,16 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -51,7 +54,6 @@ import java.util.Map;
import java.util.Properties;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.LOG;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -63,6 +65,12 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
protected Connection conn;
protected PhoenixHBaseAccessor hdb;
+ public final Log LOG;
+
+ public AbstractMiniHBaseClusterTest() {
+ LOG = LogFactory.getLog(this.getClass());
+ }
+
@BeforeClass
public static void doSetup() throws Exception {
Map<String, String> props = getDefaultProps();
@@ -179,21 +187,32 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
metricsConf.setLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE, 600000);
return
- new PhoenixHBaseAccessor(
- new Configuration(),
- metricsConf,
- new ConnectionProvider() {
- @Override
- public Connection getConnection() {
- Connection connection = null;
- try {
- connection = DriverManager.getConnection(getUrl());
- } catch (SQLException e) {
- LOG.warn("Unable to connect to HBase store using Phoenix.", e);
- }
- return connection;
- }
- });
+ new PhoenixHBaseAccessor(
+ new Configuration(),
+ metricsConf,
+ new PhoenixConnectionProvider() {
+
+ @Override
+ public HBaseAdmin getHBaseAdmin() throws IOException {
+ try {
+ return driver.getConnectionQueryServices(null, null).getAdmin();
+ } catch (SQLException e) {
+ LOG.error(e);
+ }
+ return null;
+ }
+
+ @Override
+ public Connection getConnection() {
+ Connection connection = null;
+ try {
+ connection = DriverManager.getConnection(getUrl());
+ } catch (SQLException e) {
+ LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+ }
+ return connection;
+ }
+ });
}
protected void insertMetricRecords(Connection conn, TimelineMetrics metrics, long currentTime)
http://git-wip-us.apache.org/repos/asf/ambari/blob/22ec03b6/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
index 0522f81..e3e037a 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
@@ -17,7 +17,10 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
@@ -50,7 +53,11 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineMetric;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createMetricHostAggregate;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.DEFAULT_COMPACTION_POLICY_CLASS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.FIFO_COMPACTION_POLICY_CLASS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.HSTORE_COMPACTION_CLASS_KEY;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES;
public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
@@ -298,13 +305,13 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
records.put(createEmptyTimelineClusterMetric(ctime),
- new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(createEmptyTimelineClusterMetric(ctime += minute),
- new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(createEmptyTimelineClusterMetric(ctime += minute),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(createEmptyTimelineClusterMetric(ctime += minute),
- new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
hdb.saveClusterAggregateRecords(records);
boolean success = agg.doWork(startTime, ctime + minute);
@@ -327,6 +334,45 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
assertEquals(2.0, metric.getMetricValues().values().iterator().next(), 0.00001);
}
+ @Test
+ public void testInitPolicies() throws Exception {
+ HBaseAdmin hBaseAdmin = hdb.getHBaseAdmin();
+
+ // Verify policies are unset
+ for (String tableName : PHOENIX_TABLES) {
+ HTableDescriptor tableDescriptor = hBaseAdmin.getTableDescriptor(tableName.getBytes());
+
+ Assert.assertFalse("Normalizer disabled by default.", tableDescriptor.isNormalizationEnabled());
+ Assert.assertNull("Default compaction policy is null.",
+ tableDescriptor.getConfigurationValue(HSTORE_COMPACTION_CLASS_KEY));
+ }
+
+ hdb.initPolicies();
+
+ // Verify expected policies are set
+ boolean normalizerEnabled = false;
+ String compactionPolicy = null;
+ for (int i = 0; i < 10; i++) {
+ LOG.warn("Policy check retry : " + i);
+ for (String tableName : PHOENIX_TABLES) {
+ HTableDescriptor tableDescriptor = hBaseAdmin.getTableDescriptor(tableName.getBytes());
+ normalizerEnabled = tableDescriptor.isNormalizationEnabled();
+ compactionPolicy = tableDescriptor.getConfigurationValue(HSTORE_COMPACTION_CLASS_KEY);
+ LOG.debug("Table: " + tableName + ", normalizerEnabled = " + normalizerEnabled);
+ LOG.debug("Table: " + tableName + ", compactionPolicy = " + compactionPolicy);
+ // Best effort for 20 seconds
+ if (!normalizerEnabled || compactionPolicy == null) {
+ Thread.sleep(2000l);
+ }
+ }
+ }
+
+ Assert.assertTrue("Normalizer enabled.", normalizerEnabled);
+ Assert.assertEquals("FIFO compaction policy is set.", FIFO_COMPACTION_POLICY_CLASS, compactionPolicy);
+
+ hBaseAdmin.close();
+ }
+
private Map<String, List<Function>> singletonValueFunctionMap(String metricName) {
return Collections.singletonMap(metricName, Collections.singletonList(new Function()));
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/22ec03b6/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
index 9838bca..290a98a 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
@@ -19,12 +19,14 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
import org.apache.phoenix.exception.PhoenixIOException;
import org.easymock.EasyMock;
@@ -59,7 +61,12 @@ public class PhoenixHBaseAccessorTest {
hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
Configuration metricsConf = new Configuration();
- ConnectionProvider connectionProvider = new ConnectionProvider() {
+ PhoenixConnectionProvider connectionProvider = new PhoenixConnectionProvider() {
+ @Override
+ public HBaseAdmin getHBaseAdmin() throws IOException {
+ return null;
+ }
+
@Override
public Connection getConnection() throws SQLException {
return null;
@@ -103,7 +110,12 @@ public class PhoenixHBaseAccessorTest {
hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
Configuration metricsConf = new Configuration();
- ConnectionProvider connectionProvider = new ConnectionProvider() {
+ PhoenixConnectionProvider connectionProvider = new PhoenixConnectionProvider() {
+ @Override
+ public HBaseAdmin getHBaseAdmin() throws IOException {
+ return null;
+ }
+
@Override
public Connection getConnection() throws SQLException {
return null;
@@ -148,7 +160,12 @@ public class PhoenixHBaseAccessorTest {
hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
Configuration metricsConf = new Configuration();
- ConnectionProvider connectionProvider = new ConnectionProvider() {
+ PhoenixConnectionProvider connectionProvider = new PhoenixConnectionProvider() {
+ @Override
+ public HBaseAdmin getHBaseAdmin() throws IOException {
+ return null;
+ }
+
@Override
public Connection getConnection() throws SQLException {
return null;