You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2016/02/24 23:27:07 UTC

ambari git commit: AMBARI-15170. Move logic involving HBase shell calls to enable normalization and FIFO compaction policy to Java. (swagle)

Repository: ambari
Updated Branches:
  refs/heads/trunk 9bad06c0f -> 22ec03b69


AMBARI-15170. Move logic involving HBase shell calls to enable normalization and FIFO compaction policy to Java. (swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/22ec03b6
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/22ec03b6
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/22ec03b6

Branch: refs/heads/trunk
Commit: 22ec03b69762a03691b6c76b7cbccaa2cc571852
Parents: 9bad06c
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Wed Feb 24 14:26:35 2016 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Wed Feb 24 14:26:45 2016 -0800

----------------------------------------------------------------------
 .../conf/unix/ambari-metrics-collector          |   9 +-
 .../timeline/HBaseTimelineMetricStore.java      |   2 +
 .../metrics/timeline/PhoenixHBaseAccessor.java  | 100 ++++++++++++++++++-
 .../query/DefaultPhoenixDataSource.java         |  23 ++++-
 .../query/PhoenixConnectionProvider.java        |  31 ++++++
 .../timeline/query/PhoenixTransactSQL.java      |  19 +++-
 .../timeline/AbstractMiniHBaseClusterTest.java  |  53 ++++++----
 .../timeline/ITPhoenixHBaseAccessor.java        |  52 +++++++++-
 .../timeline/PhoenixHBaseAccessorTest.java      |  23 ++++-
 9 files changed, 273 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/22ec03b6/ambari-metrics/ambari-metrics-timelineservice/conf/unix/ambari-metrics-collector
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/conf/unix/ambari-metrics-collector b/ambari-metrics/ambari-metrics-timelineservice/conf/unix/ambari-metrics-collector
index e319d73..a08c7b6 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/conf/unix/ambari-metrics-collector
+++ b/ambari-metrics/ambari-metrics-timelineservice/conf/unix/ambari-metrics-collector
@@ -304,8 +304,6 @@ function start()
   if [ $? -ne 0 ]; then
     echo "WARNING: Ambari Metrics data model initialization failed."
      >&2 echo "WARNING: Ambari Metrics data model initialization failed."
-  else
-    enable_normalization_fifo
   fi
   }
 
@@ -397,7 +395,7 @@ case "$1" in
 	start)
     start
 
-  ;;
+    ;;
 	stop)
     stop
 
@@ -414,6 +412,9 @@ case "$1" in
 	restart)
 	  stop
 	  start
-	;;
+	  ;;
+  enable_normalization_fifo)
+    enable_normalization_fifo
+    ;;
 
 esac

http://git-wip-us.apache.org/repos/asf/ambari/blob/22ec03b6/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index b5ec6e8..37e4796 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -83,6 +83,8 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
       metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor, metricsConf);
       metricMetadataManager.initializeMetadata();
 
+      hBaseAccessor.initPolicies();
+
       if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
         LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
       }

http://git-wip-us.apache.org/repos/asf/ambari/blob/22ec03b6/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 1c86ebb..09da6bf 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -22,6 +22,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.RetryCounterFactory;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
@@ -38,8 +40,8 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
@@ -101,6 +103,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
@@ -134,18 +137,27 @@ public class PhoenixHBaseAccessor {
   private final Configuration hbaseConf;
   private final Configuration metricsConf;
   private final RetryCounterFactory retryCounterFactory;
-  private final ConnectionProvider dataSource;
+  private final PhoenixConnectionProvider dataSource;
   private final long outOfBandTimeAllowance;
   private final boolean skipBlockCacheForAggregatorsEnabled;
 
+  static final String HSTORE_COMPACTION_CLASS_KEY =
+    "hbase.hstore.defaultengine.compactionpolicy.class";
+  static final String FIFO_COMPACTION_POLICY_CLASS =
+    "org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy";
+  static final String DEFAULT_COMPACTION_POLICY_CLASS =
+    "org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy";
+  static final String BLOCKING_STORE_FILES_KEY =
+    "hbase.hstore.blockingStoreFiles";
+
   public PhoenixHBaseAccessor(Configuration hbaseConf,
                               Configuration metricsConf){
     this(hbaseConf, metricsConf, new DefaultPhoenixDataSource(hbaseConf));
   }
 
   PhoenixHBaseAccessor(Configuration hbaseConf,
-                              Configuration metricsConf,
-                              ConnectionProvider dataSource) {
+                       Configuration metricsConf,
+                       PhoenixConnectionProvider dataSource) {
     this.hbaseConf = hbaseConf;
     this.metricsConf = metricsConf;
     RESULTSET_LIMIT = metricsConf.getInt(GLOBAL_RESULT_LIMIT, RESULTSET_LIMIT);
@@ -215,6 +227,15 @@ public class PhoenixHBaseAccessor {
     return dataSource.getConnection();
   }
 
+  /**
+   * Unit test purpose only for now.
+   * @return @HBaseAdmin
+   * @throws IOException
+   */
+  HBaseAdmin getHBaseAdmin() throws IOException {
+    return dataSource.getHBaseAdmin();
+  }
+
   protected void initMetricSchema() {
     Connection conn = null;
     Statement stmt = null;
@@ -333,6 +354,77 @@ public class PhoenixHBaseAccessor {
     }
   }
 
+  protected void initPolicies() {
+    boolean enableNormalizer = hbaseConf.getBoolean("hbase.normalizer.enabled", true);
+    boolean enableFifoCompaction = metricsConf.getBoolean("timeline.metrics.hbase.fifo.compaction.enabled", true);
+
+    if (!enableNormalizer && !enableFifoCompaction) {
+      return;
+    }
+
+    HBaseAdmin hBaseAdmin = null;
+    try {
+      hBaseAdmin = dataSource.getHBaseAdmin();
+    } catch (IOException e) {
+      LOG.warn("Unable to initialize HBaseAdmin for setting policies.", e);
+    }
+
+    if (hBaseAdmin != null) {
+      for (String tableName : PHOENIX_TABLES) {
+        try {
+          boolean modifyTable = false;
+          HTableDescriptor tableDescriptor = hBaseAdmin.getTableDescriptor(tableName.getBytes());
+
+          if (enableNormalizer &&
+              !tableDescriptor.isNormalizationEnabled()) {
+            tableDescriptor.setNormalizationEnabled(true);
+            LOG.info("Enabling normalizer for " + tableName);
+            modifyTable = true;
+          }
+
+          Map<String, String> config = tableDescriptor.getConfiguration();
+          if (enableFifoCompaction &&
+             !FIFO_COMPACTION_POLICY_CLASS.equals(config.get(HSTORE_COMPACTION_CLASS_KEY))) {
+            tableDescriptor.setConfiguration(HSTORE_COMPACTION_CLASS_KEY,
+              FIFO_COMPACTION_POLICY_CLASS);
+            LOG.info("Setting config property " + HSTORE_COMPACTION_CLASS_KEY +
+              " = " + FIFO_COMPACTION_POLICY_CLASS + " for " + tableName);
+            // Need to set blockingStoreFiles to 1000 for FIFO
+            tableDescriptor.setConfiguration(BLOCKING_STORE_FILES_KEY, "1000");
+            LOG.info("Setting config property " + BLOCKING_STORE_FILES_KEY +
+              " = " + 1000 + " for " + tableName);
+            modifyTable = true;
+          }
+          // Set back original policy if fifo disabled
+          if (!enableFifoCompaction &&
+             FIFO_COMPACTION_POLICY_CLASS.equals(config.get(HSTORE_COMPACTION_CLASS_KEY))) {
+            tableDescriptor.setConfiguration(HSTORE_COMPACTION_CLASS_KEY,
+              DEFAULT_COMPACTION_POLICY_CLASS);
+            LOG.info("Setting config property " + HSTORE_COMPACTION_CLASS_KEY +
+              " = " + DEFAULT_COMPACTION_POLICY_CLASS + " for " + tableName);
+            tableDescriptor.setConfiguration(BLOCKING_STORE_FILES_KEY, "300");
+            LOG.info("Setting config property " + BLOCKING_STORE_FILES_KEY +
+              " = " + 300 + " for " + tableName);
+            modifyTable = true;
+          }
+
+          // Persist only if anything changed
+          if (modifyTable) {
+            hBaseAdmin.modifyTable(tableName.getBytes(), tableDescriptor);
+          }
+
+        } catch (IOException e) {
+          LOG.error("Failed setting policies for " + tableName, e);
+        }
+      }
+      try {
+        hBaseAdmin.close();
+      } catch (IOException e) {
+        LOG.warn("Exception on HBaseAdmin close.", e);
+      }
+    }
+  }
+
   protected String getSplitPointsStr(String splitPoints) {
     if (StringUtils.isEmpty(splitPoints.trim())) {
       return "";

http://git-wip-us.apache.org/repos/asf/ambari/blob/22ec03b6/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
index 8283f7d..0ea136b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
@@ -21,25 +21,29 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 
-public class DefaultPhoenixDataSource implements ConnectionProvider {
+public class DefaultPhoenixDataSource implements PhoenixConnectionProvider {
 
   static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class);
-  private static final String ZOOKEEPER_CLIENT_PORT =
-    "hbase.zookeeper.property.clientPort";
+  private static final String ZOOKEEPER_CLIENT_PORT ="hbase.zookeeper.property.clientPort";
   private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
   private static final String ZNODE_PARENT = "zookeeper.znode.parent";
 
   private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s";
   private final String url;
 
+  private Configuration hbaseConf;
+
   public DefaultPhoenixDataSource(Configuration hbaseConf) {
-    String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT,
-      "2181");
+    this.hbaseConf = hbaseConf;
+    String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT, "2181");
     String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM);
     String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/ams-hbase-unsecure");
     if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) {
@@ -54,6 +58,15 @@ public class DefaultPhoenixDataSource implements ConnectionProvider {
   }
 
   /**
+   * Get HBaseAdmin for table ops.
+   * @return @HBaseAdmin
+   * @throws IOException
+   */
+  public HBaseAdmin getHBaseAdmin() throws IOException {
+    return (HBaseAdmin) ConnectionFactory.createConnection(hbaseConf).getAdmin();
+  }
+
+  /**
    * Get JDBC connection to HBase store. Assumption is that the hbase
    * configuration is present on the classpath and loaded by the caller into
    * the Configuration object.

http://git-wip-us.apache.org/repos/asf/ambari/blob/22ec03b6/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java
new file mode 100644
index 0000000..cacbcfb
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java
@@ -0,0 +1,31 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+
+import java.io.IOException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface PhoenixConnectionProvider extends ConnectionProvider {
+  /**
+   * Get HBaseAdmin for the Phoenix connection
+   * @return
+   * @throws IOException
+   */
+  HBaseAdmin getHBaseAdmin() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/22ec03b6/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
index cd1bfb3..89e8c34 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
@@ -19,15 +19,16 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.metrics2.sink.timeline.Precision;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
-import java.util.Collection;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -289,6 +290,18 @@ public class PhoenixTransactSQL {
     "METRIC_AGGREGATE_HOURLY";
   public static final String METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME =
     "METRIC_AGGREGATE_DAILY";
+
+  public static final String[] PHOENIX_TABLES = {
+    METRICS_RECORD_TABLE_NAME,
+    METRICS_AGGREGATE_MINUTE_TABLE_NAME,
+    METRICS_AGGREGATE_HOURLY_TABLE_NAME,
+    METRICS_AGGREGATE_DAILY_TABLE_NAME,
+    METRICS_CLUSTER_AGGREGATE_TABLE_NAME,
+    METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME,
+    METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME,
+    METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME
+  };
+
   public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY";
   public static final String DEFAULT_ENCODING = "FAST_DIFF";
   public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes

http://git-wip-us.apache.org/repos/asf/ambari/blob/22ec03b6/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
index df4fc89..7410e9d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
@@ -17,13 +17,16 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -51,7 +54,6 @@ import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.LOG;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -63,6 +65,12 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
   protected Connection conn;
   protected PhoenixHBaseAccessor hdb;
 
+  public final Log LOG;
+
+  public AbstractMiniHBaseClusterTest() {
+    LOG = LogFactory.getLog(this.getClass());
+  }
+
   @BeforeClass
   public static void doSetup() throws Exception {
     Map<String, String> props = getDefaultProps();
@@ -179,21 +187,32 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
     metricsConf.setLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE, 600000);
 
     return
-        new PhoenixHBaseAccessor(
-            new Configuration(),
-            metricsConf,
-            new ConnectionProvider() {
-              @Override
-              public Connection getConnection() {
-                Connection connection = null;
-                try {
-                  connection = DriverManager.getConnection(getUrl());
-                } catch (SQLException e) {
-                  LOG.warn("Unable to connect to HBase store using Phoenix.", e);
-                }
-                return connection;
-              }
-            });
+      new PhoenixHBaseAccessor(
+        new Configuration(),
+        metricsConf,
+        new PhoenixConnectionProvider() {
+
+          @Override
+          public HBaseAdmin getHBaseAdmin() throws IOException {
+            try {
+              return driver.getConnectionQueryServices(null, null).getAdmin();
+            } catch (SQLException e) {
+              LOG.error(e);
+            }
+            return null;
+          }
+
+          @Override
+          public Connection getConnection() {
+            Connection connection = null;
+            try {
+              connection = DriverManager.getConnection(getUrl());
+            } catch (SQLException e) {
+              LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+            }
+            return connection;
+          }
+        });
   }
 
   protected void insertMetricRecords(Connection conn, TimelineMetrics metrics, long currentTime)

http://git-wip-us.apache.org/repos/asf/ambari/blob/22ec03b6/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
index 0522f81..e3e037a 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
@@ -17,7 +17,10 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import junit.framework.Assert;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
@@ -50,7 +53,11 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineMetric;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createMetricHostAggregate;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.DEFAULT_COMPACTION_POLICY_CLASS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.FIFO_COMPACTION_POLICY_CLASS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.HSTORE_COMPACTION_CLASS_KEY;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES;
 
 
 public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
@@ -298,13 +305,13 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
         new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
 
     records.put(createEmptyTimelineClusterMetric(ctime),
-        new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
     records.put(createEmptyTimelineClusterMetric(ctime += minute),
-        new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
     records.put(createEmptyTimelineClusterMetric(ctime += minute),
         new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
     records.put(createEmptyTimelineClusterMetric(ctime += minute),
-        new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
 
     hdb.saveClusterAggregateRecords(records);
     boolean success = agg.doWork(startTime, ctime + minute);
@@ -327,6 +334,45 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     assertEquals(2.0, metric.getMetricValues().values().iterator().next(), 0.00001);
   }
 
+  @Test
+  public void testInitPolicies() throws Exception {
+    HBaseAdmin hBaseAdmin = hdb.getHBaseAdmin();
+
+    // Verify policies are unset
+    for (String tableName : PHOENIX_TABLES) {
+      HTableDescriptor tableDescriptor = hBaseAdmin.getTableDescriptor(tableName.getBytes());
+
+      Assert.assertFalse("Normalizer disabled by default.", tableDescriptor.isNormalizationEnabled());
+      Assert.assertNull("Default compaction policy is null.",
+        tableDescriptor.getConfigurationValue(HSTORE_COMPACTION_CLASS_KEY));
+    }
+
+    hdb.initPolicies();
+
+    // Verify expected policies are set
+    boolean normalizerEnabled = false;
+    String compactionPolicy = null;
+    for (int i = 0; i < 10; i++) {
+      LOG.warn("Policy check retry : " + i);
+      for (String tableName : PHOENIX_TABLES) {
+        HTableDescriptor tableDescriptor = hBaseAdmin.getTableDescriptor(tableName.getBytes());
+        normalizerEnabled = tableDescriptor.isNormalizationEnabled();
+        compactionPolicy = tableDescriptor.getConfigurationValue(HSTORE_COMPACTION_CLASS_KEY);
+        LOG.debug("Table: " + tableName + ", normalizerEnabled = " + normalizerEnabled);
+        LOG.debug("Table: " + tableName + ", compactionPolicy = " + compactionPolicy);
+        // Best effort for 20 seconds
+        if (!normalizerEnabled || compactionPolicy == null) {
+          Thread.sleep(2000l);
+        }
+      }
+    }
+
+    Assert.assertTrue("Normalizer enabled.", normalizerEnabled);
+    Assert.assertEquals("FIFO compaction policy is set.", FIFO_COMPACTION_POLICY_CLASS, compactionPolicy);
+
+    hBaseAdmin.close();
+  }
+
   private Map<String, List<Function>> singletonValueFunctionMap(String metricName) {
     return Collections.singletonMap(metricName, Collections.singletonList(new Function()));
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/22ec03b6/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
index 9838bca..290a98a 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
@@ -19,12 +19,14 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 import org.apache.phoenix.exception.PhoenixIOException;
 import org.easymock.EasyMock;
@@ -59,7 +61,12 @@ public class PhoenixHBaseAccessorTest {
     hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
     Configuration metricsConf = new Configuration();
 
-    ConnectionProvider connectionProvider = new ConnectionProvider() {
+    PhoenixConnectionProvider connectionProvider = new PhoenixConnectionProvider() {
+      @Override
+      public HBaseAdmin getHBaseAdmin() throws IOException {
+        return null;
+      }
+
       @Override
       public Connection getConnection() throws SQLException {
         return null;
@@ -103,7 +110,12 @@ public class PhoenixHBaseAccessorTest {
     hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
     Configuration metricsConf = new Configuration();
 
-    ConnectionProvider connectionProvider = new ConnectionProvider() {
+    PhoenixConnectionProvider connectionProvider = new PhoenixConnectionProvider() {
+      @Override
+      public HBaseAdmin getHBaseAdmin() throws IOException {
+        return null;
+      }
+
       @Override
       public Connection getConnection() throws SQLException {
         return null;
@@ -148,7 +160,12 @@ public class PhoenixHBaseAccessorTest {
     hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
     Configuration metricsConf = new Configuration();
 
-    ConnectionProvider connectionProvider = new ConnectionProvider() {
+    PhoenixConnectionProvider connectionProvider = new PhoenixConnectionProvider() {
+      @Override
+      public HBaseAdmin getHBaseAdmin() throws IOException {
+        return null;
+      }
+
       @Override
       public Connection getConnection() throws SQLException {
         return null;