You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/05/16 21:51:24 UTC

[2/7] PHOENIX-982 Avoid spinning up and tearing down mini cluster in tests (SamarthJain)

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/91b7922c/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/FailForUnsupportedHBaseVersionsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/FailForUnsupportedHBaseVersionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/FailForUnsupportedHBaseVersionsIT.java
index 0fe696d..1d0dbfd 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/FailForUnsupportedHBaseVersionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/FailForUnsupportedHBaseVersionsIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.hbase.index;
 
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -33,126 +34,129 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.VersionInfo;
-import org.apache.phoenix.util.ConfigUtil;
-import org.junit.Test;
-
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.hbase.index.covered.example.ColumnGroup;
 import org.apache.phoenix.hbase.index.covered.example.CoveredColumn;
 import org.apache.phoenix.hbase.index.covered.example.CoveredColumnIndexSpecifierBuilder;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 /**
  * Test that we correctly fail for versions of HBase that don't support current properties
  */
+@Category(NeedsOwnMiniClusterTest.class)
 public class FailForUnsupportedHBaseVersionsIT {
-  private static final Log LOG = LogFactory.getLog(FailForUnsupportedHBaseVersionsIT.class);
-
-  /**
-   * We don't support WAL Compression for HBase < 0.94.9, so we shouldn't even allow the server
-   * to start if both indexing and WAL Compression are enabled for the wrong versions.
-   */
-  @Test
-  public void testDoesNotSupportCompressedWAL() {
-    Configuration conf = HBaseConfiguration.create();
-    IndexTestingUtils.setupConfig(conf);
-    // get the current version
-    String version = VersionInfo.getVersion();
-    
-    // ensure WAL Compression not enabled
-    conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, false);
-    
-    //we support all versions without WAL Compression
-    String supported = Indexer.validateVersion(version, conf);
-    assertNull(
-      "WAL Compression wasn't enabled, but version "+version+" of HBase wasn't supported! All versions should"
-          + " support writing without a compressed WAL. Message: "+supported, supported);
-
-    // enable WAL Compression
-    conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
-
-    // set the version to something we know isn't supported
-    version = "0.94.4";
-    supported = Indexer.validateVersion(version, conf);
-    assertNotNull("WAL Compression was enabled, but incorrectly marked version as supported",
-      supported);
-    
-    //make sure the first version of 0.94 that supports Indexing + WAL Compression works
-    version = "0.94.9";
-    supported = Indexer.validateVersion(version, conf);
-    assertNull(
-      "WAL Compression wasn't enabled, but version "+version+" of HBase wasn't supported! Message: "+supported, supported);
-    
-    //make sure we support snapshot builds too
-    version = "0.94.9-SNAPSHOT";
-    supported = Indexer.validateVersion(version, conf);
-    assertNull(
-      "WAL Compression wasn't enabled, but version "+version+" of HBase wasn't supported! Message: "+supported, supported);
-  }
-
-  /**
-   * Test that we correctly abort a RegionServer when we run tests with an unsupported HBase
-   * version. The 'completeness' of this test requires that we run the test with both a version of
-   * HBase that wouldn't be supported with WAL Compression. Currently, this is the default version
-   * (0.94.4) so just running 'mvn test' will run the full test. However, this test will not fail
-   * when running against a version of HBase with WALCompression enabled. Therefore, to fully test
-   * this functionality, we need to run the test against both a supported and an unsupported version
-   * of HBase (as long as we want to support an version of HBase that doesn't support custom WAL
-   * Codecs).
-   * @throws Exception on failure
-   */
-  @Test(timeout = 300000 /* 5 mins */)
-  public void testDoesNotStartRegionServerForUnsupportedCompressionAndVersion() throws Exception {
-    Configuration conf = HBaseConfiguration.create();
-    IndexTestingUtils.setupConfig(conf);
-    // enable WAL Compression
-    conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
-
-    // check the version to see if it isn't supported
-    String version = VersionInfo.getVersion();
-    boolean supported = false;
-    if (Indexer.validateVersion(version, conf) == null) {
-      supported = true;
+    private static final Log LOG = LogFactory.getLog(FailForUnsupportedHBaseVersionsIT.class);
+
+    /**
+     * We don't support WAL Compression for HBase < 0.94.9, so we shouldn't even allow the server
+     * to start if both indexing and WAL Compression are enabled for the wrong versions.
+     */
+    @Test
+    public void testDoesNotSupportCompressedWAL() {
+        Configuration conf = HBaseConfiguration.create();
+        IndexTestingUtils.setupConfig(conf);
+        // get the current version
+        String version = VersionInfo.getVersion();
+
+        // ensure WAL Compression not enabled
+        conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, false);
+
+        //we support all versions without WAL Compression
+        String supported = Indexer.validateVersion(version, conf);
+        assertNull(
+                "WAL Compression wasn't enabled, but version "+version+" of HBase wasn't supported! All versions should"
+                        + " support writing without a compressed WAL. Message: "+supported, supported);
+
+        // enable WAL Compression
+        conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+
+        // set the version to something we know isn't supported
+        version = "0.94.4";
+        supported = Indexer.validateVersion(version, conf);
+        assertNotNull("WAL Compression was enabled, but incorrectly marked version as supported",
+                supported);
+
+        //make sure the first version of 0.94 that supports Indexing + WAL Compression works
+        version = "0.94.9";
+        supported = Indexer.validateVersion(version, conf);
+        assertNull(
+                "WAL Compression wasn't enabled, but version "+version+" of HBase wasn't supported! Message: "+supported, supported);
+
+        //make sure we support snapshot builds too
+        version = "0.94.9-SNAPSHOT";
+        supported = Indexer.validateVersion(version, conf);
+        assertNull(
+                "WAL Compression wasn't enabled, but version "+version+" of HBase wasn't supported! Message: "+supported, supported);
     }
 
-    // start the minicluster
-    HBaseTestingUtility util = new HBaseTestingUtility(conf);
-    // set replication required parameter
-    ConfigUtil.setReplicationConfigIfAbsent(conf);
-    util.startMiniCluster();
-
-    // setup the primary table
-    HTableDescriptor desc = new HTableDescriptor(
-        "testDoesNotStartRegionServerForUnsupportedCompressionAndVersion");
-    byte[] family = Bytes.toBytes("f");
-    desc.addFamily(new HColumnDescriptor(family));
-
-    // enable indexing to a non-existant index table
-    String indexTableName = "INDEX_TABLE";
-    ColumnGroup fam1 = new ColumnGroup(indexTableName);
-    fam1.add(new CoveredColumn(family, CoveredColumn.ALL_QUALIFIERS));
-    CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder();
-    builder.addIndexGroup(fam1);
-    builder.build(desc);
-
-    // get a reference to the regionserver, so we can ensure it aborts
-    HRegionServer server = util.getMiniHBaseCluster().getRegionServer(0);
-
-    // create the primary table
-    HBaseAdmin admin = util.getHBaseAdmin();
-    if (supported) {
-      admin.createTable(desc);
-      assertFalse("Hosting regeion server failed, even the HBase version (" + version
-          + ") supports WAL Compression.", server.isAborted());
-    } else {
-      admin.createTableAsync(desc, null);
-
-      // wait for the regionserver to abort - if this doesn't occur in the timeout, assume its
-      // broken.
-      while (!server.isAborted()) {
-        LOG.debug("Waiting on regionserver to abort..");
-      }
+    /**
+     * Test that we correctly abort a RegionServer when we run tests with an unsupported HBase
+     * version. The 'completeness' of this test requires that we run the test with both a version of
+     * HBase that wouldn't be supported with WAL Compression. Currently, this is the default version
+     * (0.94.4) so just running 'mvn test' will run the full test. However, this test will not fail
+     * when running against a version of HBase with WALCompression enabled. Therefore, to fully test
+     * this functionality, we need to run the test against both a supported and an unsupported version
+     * of HBase (as long as we want to support an version of HBase that doesn't support custom WAL
+     * Codecs).
+     * @throws Exception on failure
+     */
+    @Test(timeout = 300000 /* 5 mins */)
+    public void testDoesNotStartRegionServerForUnsupportedCompressionAndVersion() throws Exception {
+        Configuration conf = HBaseConfiguration.create();
+        setUpConfigForMiniCluster(conf);
+        IndexTestingUtils.setupConfig(conf);
+        // enable WAL Compression
+        conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+
+        // check the version to see if it isn't supported
+        String version = VersionInfo.getVersion();
+        boolean supported = false;
+        if (Indexer.validateVersion(version, conf) == null) {
+            supported = true;
+        }
+
+        // start the minicluster
+        HBaseTestingUtility util = new HBaseTestingUtility(conf);
+        util.startMiniCluster();
+
+        try {
+            // setup the primary table
+            HTableDescriptor desc = new HTableDescriptor(
+                    "testDoesNotStartRegionServerForUnsupportedCompressionAndVersion");
+            byte[] family = Bytes.toBytes("f");
+            desc.addFamily(new HColumnDescriptor(family));
+
+            // enable indexing to a non-existant index table
+            String indexTableName = "INDEX_TABLE";
+            ColumnGroup fam1 = new ColumnGroup(indexTableName);
+            fam1.add(new CoveredColumn(family, CoveredColumn.ALL_QUALIFIERS));
+            CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder();
+            builder.addIndexGroup(fam1);
+            builder.build(desc);
+
+            // get a reference to the regionserver, so we can ensure it aborts
+            HRegionServer server = util.getMiniHBaseCluster().getRegionServer(0);
+
+            // create the primary table
+            HBaseAdmin admin = util.getHBaseAdmin();
+            if (supported) {
+                admin.createTable(desc);
+                assertFalse("Hosting regeion server failed, even the HBase version (" + version
+                        + ") supports WAL Compression.", server.isAborted());
+            } else {
+                admin.createTableAsync(desc, null);
+
+                // wait for the regionserver to abort - if this doesn't occur in the timeout, assume its
+                // broken.
+                while (!server.isAborted()) {
+                    LOG.debug("Waiting on regionserver to abort..");
+                }
+            }
+
+        } finally {
+            // cleanup
+            util.shutdownMiniCluster();
+        }
     }
-
-    // cleanup
-    util.shutdownMiniCluster();
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/91b7922c/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
index 26515ec..fc134a3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.hbase.index.covered;
 
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -43,23 +44,24 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.ConfigUtil;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.hbase.index.IndexTestingUtils;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.TableName;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 /**
  * End-to-End test of just the {@link CoveredColumnsIndexBuilder}, but with a simple
  * {@link IndexCodec} and BatchCache implementation.
  */
+@Category(NeedsOwnMiniClusterTest.class)
 public class EndToEndCoveredColumnsIndexBuilderIT {
 
   public class TestState {
@@ -96,12 +98,11 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
   @BeforeClass
   public static void setupCluster() throws Exception {
     Configuration conf = UTIL.getConfiguration();
+    setUpConfigForMiniCluster(conf);
     IndexTestingUtils.setupConfig(conf);
     // disable version checking, so we can test against whatever version of HBase happens to be
     // installed (right now, its generally going to be SNAPSHOT versions).
     conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false);
-    // set replication required parameter
-    ConfigUtil.setReplicationConfigIfAbsent(conf);
     UTIL.startMiniCluster();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/91b7922c/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java
index 0fb06f5..5196031 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java
@@ -18,6 +18,8 @@
 package org.apache.phoenix.hbase.index.covered.example;
 
 
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -41,22 +43,23 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.util.ConfigUtil;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.hbase.index.IndexTestingUtils;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.TableName;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
-
-import org.apache.phoenix.hbase.index.IndexTestingUtils;
-import org.apache.phoenix.hbase.index.Indexer;
-import org.apache.phoenix.hbase.index.TableName;
+import org.junit.experimental.categories.Category;
 
 /**
  * Test Covered Column indexing in an 'end-to-end' manner on a minicluster. This covers cases where
  * we manage custom timestamped updates that arrive in and out of order as well as just using the
  * generically timestamped updates.
  */
+@Category(NeedsOwnMiniClusterTest.class)
 public class EndToEndCoveredIndexingIT {
   private static final Log LOG = LogFactory.getLog(EndToEndCoveredIndexingIT.class);
   protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@@ -101,12 +104,11 @@ public class EndToEndCoveredIndexingIT {
   @BeforeClass
   public static void setupCluster() throws Exception {
     Configuration conf = UTIL.getConfiguration();
+    setUpConfigForMiniCluster(conf);
     IndexTestingUtils.setupConfig(conf);
     // disable version checking, so we can test against whatever version of HBase happens to be
     // installed (right now, its generally going to be SNAPSHOT versions).
     conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false);
-    // set replication required parameter
-    ConfigUtil.setReplicationConfigIfAbsent(conf);
     UTIL.startMiniCluster();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/91b7922c/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndtoEndIndexingWithCompressionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndtoEndIndexingWithCompressionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndtoEndIndexingWithCompressionIT.java
index 4fc5c16..38aeb88 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndtoEndIndexingWithCompressionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndtoEndIndexingWithCompressionIT.java
@@ -17,25 +17,30 @@
  */
 package org.apache.phoenix.hbase.index.covered.example;
 
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
-import org.apache.phoenix.util.ConfigUtil;
-import org.junit.BeforeClass;
-
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.hbase.index.IndexTestingUtils;
 import org.apache.phoenix.hbase.index.Indexer;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
 
 /**
  * Test secondary indexing from an end-to-end perspective (client to server to index table).
  */
+
+@Category(NeedsOwnMiniClusterTest.class)
 public class EndtoEndIndexingWithCompressionIT extends EndToEndCoveredIndexingIT {
 
   @BeforeClass
   public static void setupCluster() throws Exception {
     //add our codec and enable WAL compression
     Configuration conf = UTIL.getConfiguration();
+    setUpConfigForMiniCluster(conf);
     IndexTestingUtils.setupConfig(conf);
     // disable version checking, so we can test against whatever version of HBase happens to be
     // installed (right now, its generally going to be SNAPSHOT versions).
@@ -43,8 +48,6 @@ public class EndtoEndIndexingWithCompressionIT extends EndToEndCoveredIndexingIT
     conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY,
     IndexedWALEditCodec.class.getName());
     conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
-    // set replication required parameter
-    ConfigUtil.setReplicationConfigIfAbsent(conf);
     //start the mini-cluster
     UTIL.startMiniCluster();
   }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/91b7922c/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
index 7310709..6a79e31 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.hbase.index.covered.example;
 
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
@@ -34,24 +35,26 @@ import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.hbase.index.IndexTestingUtils;
+import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.TableName;
 import org.apache.phoenix.hbase.index.covered.IndexUpdate;
 import org.apache.phoenix.hbase.index.covered.TableState;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.index.BaseIndexCodec;
-import org.apache.phoenix.util.ConfigUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 /**
  * If {@link DoNotRetryIOException} is not subclassed correctly (with the {@link String}
  * constructor), {@link MultiResponse#readFields(java.io.DataInput)} will not correctly deserialize
  * the exception, and just return <tt>null</tt> to the client, which then just goes and retries.
  */
+@Category(NeedsOwnMiniClusterTest.class)
 public class FailWithoutRetriesIT {
 
   private static final Log LOG = LogFactory.getLog(FailWithoutRetriesIT.class);
@@ -84,10 +87,9 @@ public class FailWithoutRetriesIT {
   public static void setupCluster() throws Exception {
     // setup and verify the config
     Configuration conf = UTIL.getConfiguration();
+    setUpConfigForMiniCluster(conf);
     IndexTestingUtils.setupConfig(conf);
     IndexManagementUtil.ensureMutableIndexingCorrectlyConfigured(conf);
-    // set replication required parameter
-    ConfigUtil.setReplicationConfigIfAbsent(conf);
     // start the cluster
     UTIL.startMiniCluster();
   }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/91b7922c/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
index 81ef78f..744b5d6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
@@ -17,29 +17,32 @@
  */
 package org.apache.phoenix.mapreduce;
 
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.jdbc.PhoenixDriver;
-import org.apache.phoenix.util.ConfigUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
-import java.io.PrintWriter;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
+@Category(NeedsOwnMiniClusterTest.class)
 public class CsvBulkLoadToolIT {
 
     // We use HBaseTestUtil because we need to start up a MapReduce cluster as well
@@ -51,7 +54,7 @@ public class CsvBulkLoadToolIT {
     public static void setUp() throws Exception {
         hbaseTestUtil = new HBaseTestingUtility();
         Configuration conf = hbaseTestUtil.getConfiguration();
-        ConfigUtil.setReplicationConfigIfAbsent(conf);
+        setUpConfigForMiniCluster(conf);
         hbaseTestUtil.startMiniCluster();
         hbaseTestUtil.startMiniMapReduceCluster();
 

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/91b7922c/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
index 7834d0e..3df23a6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
@@ -174,12 +174,15 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
                     SQLCloseables.closeAll(connectionQueryServices);
                 } finally {
                     // We know there's a services object if any connections were made
-                    services.getExecutor().shutdownNow();
+                    services.close();
                 }
             } finally {
+                //even if something wrong happened while closing services above, we still
+                //want to set it to null. Otherwise, we will end up having a possibly non-working
+                //services instance. 
+                services = null;
                 connectionQueryServices.clear();
             }
         }
-        services = null;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/91b7922c/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
index 1e8fec7..10e562b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java
@@ -69,6 +69,7 @@ public abstract class BaseQueryServicesImpl implements QueryServices {
 
     @Override
     public void close() {
+        executor.shutdown();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/91b7922c/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
index bdf4144..3c8a6df 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.hbase.index.write;
 
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.hbase.index.IndexTestingUtils;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.TableName;
@@ -70,6 +72,7 @@ import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import com.google.common.collect.Multimap;
 
@@ -78,6 +81,7 @@ import com.google.common.collect.Multimap;
  * region was present on the same server, we have to make a best effort to not kill the server for
  * not succeeding on index writes while the index region is coming up.
  */
+@Category(NeedsOwnMiniClusterTest.class)
 public class TestWALRecoveryCaching {
 
   private static final Log LOG = LogFactory.getLog(TestWALRecoveryCaching.class);
@@ -145,6 +149,7 @@ public class TestWALRecoveryCaching {
   public void testWaitsOnIndexRegionToReload() throws Exception {
     HBaseTestingUtility util = new HBaseTestingUtility();
     Configuration conf = util.getConfiguration();
+    setUpConfigForMiniCluster(conf);
 
     // setup other useful stats
     IndexTestingUtils.setupConfig(conf);

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/91b7922c/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
index afc7143..7d0755f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
@@ -20,6 +20,8 @@ package org.apache.phoenix.jdbc;
 import java.sql.SQLException;
 import java.util.Properties;
 
+import javax.annotation.concurrent.ThreadSafe;
+
 import org.apache.phoenix.end2end.ConnectionQueryServicesTestImpl;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.ConnectionlessQueryServicesImpl;
@@ -37,26 +39,27 @@ import org.apache.phoenix.util.ReadOnlyProps;
  * 
  * @since 0.1
  */
+@ThreadSafe
 public class PhoenixTestDriver extends PhoenixEmbeddedDriver {
-    private ConnectionQueryServices queryServices;
+    
+    private ConnectionQueryServices connectionQueryServices;
     private final ReadOnlyProps overrideProps;
-    private QueryServices services;
+    private final QueryServices queryServices;
 
     public PhoenixTestDriver() {
         this.overrideProps = ReadOnlyProps.EMPTY_PROPS;
+        queryServices = new QueryServicesTestImpl();
     }
 
     // For tests to override the default configuration
-    public PhoenixTestDriver(ReadOnlyProps overrideProps) {
-        this.overrideProps = overrideProps;
+    public PhoenixTestDriver(ReadOnlyProps props) {
+        overrideProps = props;
+        queryServices = new QueryServicesTestImpl(overrideProps);
     }
 
     @Override
-    public synchronized QueryServices getQueryServices() {
-        if (services == null) {
-            services = new QueryServicesTestImpl(overrideProps);
-        }
-        return services;
+    public QueryServices getQueryServices() {
+        return queryServices;
     }
 
     @Override
@@ -66,27 +69,22 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver {
     }
 
     @Override // public for testing
-    public ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException {
-        if (queryServices != null) {
-            return queryServices;
-        }
-        QueryServices services = getQueryServices();
+    public synchronized ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException {
+        if (connectionQueryServices != null) { return connectionQueryServices; }
         ConnectionInfo connInfo = ConnectionInfo.create(url);
         if (connInfo.isConnectionless()) {
-            queryServices =  new ConnectionlessQueryServicesImpl(services);
+            connectionQueryServices = new ConnectionlessQueryServicesImpl(queryServices);
         } else {
-            queryServices =  new ConnectionQueryServicesTestImpl(services, connInfo);
+            connectionQueryServices = new ConnectionQueryServicesTestImpl(queryServices, connInfo);
         }
-        queryServices.init(url, info);
-        return queryServices;
+        connectionQueryServices.init(url, info);
+        return connectionQueryServices;
     }
     
     @Override
-    public void close() throws SQLException {
-        try {
-            queryServices.close();
-        } finally {
-            queryServices = null;
-        }
+    public synchronized void close() throws SQLException {
+        connectionQueryServices.close();
+        queryServices.close();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/91b7922c/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
index e8a3716..6fa9c7f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
@@ -29,6 +29,7 @@ import static org.apache.phoenix.util.TestUtil.MULTI_CF_NAME;
 import static org.apache.phoenix.util.TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL;
 import static org.apache.phoenix.util.TestUtil.PTSDB_NAME;
 import static org.apache.phoenix.util.TestUtil.TABLE_WITH_ARRAY;
+import static org.junit.Assert.assertTrue;
 
 import java.sql.DriverManager;
 import java.util.Properties;
@@ -36,11 +37,14 @@ import java.util.Properties;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.BeforeClass;
 
@@ -66,7 +70,28 @@ public class BaseConnectionlessQueryTest extends BaseTest {
     protected static String getUrl(String tenantId) {
         return getUrl() + ';' + TENANT_ID_ATTRIB + '=' + tenantId;
     }
-
+    
+    protected static PhoenixTestDriver driver;
+    
+    private static void startServer(String url) throws Exception {
+        //assertNull(driver);
+        // only load the test driver if we are testing locally - for integration tests, we want to
+        // test on a wider scale
+        if (PhoenixEmbeddedDriver.isTestUrl(url)) {
+            driver = initDriver(ReadOnlyProps.EMPTY_PROPS);
+            assertTrue(DriverManager.getDriver(url) == driver);
+            driver.connect(url, TestUtil.TEST_PROPERTIES);
+        }
+    }
+    
+    protected static synchronized PhoenixTestDriver initDriver(ReadOnlyProps props) throws Exception {
+        if (driver == null) {
+            driver = new PhoenixTestDriver(props);
+            DriverManager.registerDriver(driver);
+        }
+        return driver;
+    }
+    
     @BeforeClass
     public static void doSetup() throws Exception {
         startServer(getUrl());

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/91b7922c/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 6a76178..e105fbb 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -17,11 +17,29 @@
  */
 package org.apache.phoenix.query;
 
+import static org.apache.phoenix.util.PhoenixRuntime.CURRENT_SCN_ATTRIB;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
 import static org.apache.phoenix.util.TestUtil.ATABLE_NAME;
+import static org.apache.phoenix.util.TestUtil.A_VALUE;
 import static org.apache.phoenix.util.TestUtil.BTABLE_NAME;
+import static org.apache.phoenix.util.TestUtil.B_VALUE;
 import static org.apache.phoenix.util.TestUtil.CUSTOM_ENTITY_DATA_FULL_NAME;
+import static org.apache.phoenix.util.TestUtil.C_VALUE;
+import static org.apache.phoenix.util.TestUtil.ENTITYHISTID1;
+import static org.apache.phoenix.util.TestUtil.ENTITYHISTID2;
+import static org.apache.phoenix.util.TestUtil.ENTITYHISTID3;
+import static org.apache.phoenix.util.TestUtil.ENTITYHISTID4;
+import static org.apache.phoenix.util.TestUtil.ENTITYHISTID5;
+import static org.apache.phoenix.util.TestUtil.ENTITYHISTID6;
+import static org.apache.phoenix.util.TestUtil.ENTITYHISTID7;
+import static org.apache.phoenix.util.TestUtil.ENTITYHISTID8;
+import static org.apache.phoenix.util.TestUtil.ENTITYHISTID9;
 import static org.apache.phoenix.util.TestUtil.ENTITY_HISTORY_SALTED_TABLE_NAME;
 import static org.apache.phoenix.util.TestUtil.ENTITY_HISTORY_TABLE_NAME;
+import static org.apache.phoenix.util.TestUtil.E_VALUE;
 import static org.apache.phoenix.util.TestUtil.FUNKY_NAME;
 import static org.apache.phoenix.util.TestUtil.GROUPBYTEST_NAME;
 import static org.apache.phoenix.util.TestUtil.HBASE_DYNAMIC_COLUMNS;
@@ -33,42 +51,104 @@ import static org.apache.phoenix.util.TestUtil.JOIN_ITEM_TABLE_FULL_NAME;
 import static org.apache.phoenix.util.TestUtil.JOIN_ORDER_TABLE_FULL_NAME;
 import static org.apache.phoenix.util.TestUtil.JOIN_SUPPLIER_TABLE_FULL_NAME;
 import static org.apache.phoenix.util.TestUtil.KEYONLY_NAME;
+import static org.apache.phoenix.util.TestUtil.LOCALHOST;
 import static org.apache.phoenix.util.TestUtil.MDTEST_NAME;
+import static org.apache.phoenix.util.TestUtil.MILLIS_IN_DAY;
 import static org.apache.phoenix.util.TestUtil.MULTI_CF_NAME;
 import static org.apache.phoenix.util.TestUtil.MUTABLE_INDEX_DATA_TABLE;
+import static org.apache.phoenix.util.TestUtil.PARENTID1;
+import static org.apache.phoenix.util.TestUtil.PARENTID2;
+import static org.apache.phoenix.util.TestUtil.PARENTID3;
+import static org.apache.phoenix.util.TestUtil.PARENTID4;
+import static org.apache.phoenix.util.TestUtil.PARENTID5;
+import static org.apache.phoenix.util.TestUtil.PARENTID6;
+import static org.apache.phoenix.util.TestUtil.PARENTID7;
+import static org.apache.phoenix.util.TestUtil.PARENTID8;
+import static org.apache.phoenix.util.TestUtil.PARENTID9;
 import static org.apache.phoenix.util.TestUtil.PRODUCT_METRICS_NAME;
 import static org.apache.phoenix.util.TestUtil.PTSDB2_NAME;
 import static org.apache.phoenix.util.TestUtil.PTSDB3_NAME;
 import static org.apache.phoenix.util.TestUtil.PTSDB_NAME;
+import static org.apache.phoenix.util.TestUtil.ROW1;
+import static org.apache.phoenix.util.TestUtil.ROW2;
+import static org.apache.phoenix.util.TestUtil.ROW3;
+import static org.apache.phoenix.util.TestUtil.ROW4;
+import static org.apache.phoenix.util.TestUtil.ROW5;
+import static org.apache.phoenix.util.TestUtil.ROW6;
+import static org.apache.phoenix.util.TestUtil.ROW7;
+import static org.apache.phoenix.util.TestUtil.ROW8;
+import static org.apache.phoenix.util.TestUtil.ROW9;
 import static org.apache.phoenix.util.TestUtil.STABLE_NAME;
 import static org.apache.phoenix.util.TestUtil.TABLE_WITH_ARRAY;
 import static org.apache.phoenix.util.TestUtil.TABLE_WITH_SALTING;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 
+import java.math.BigDecimal;
+import java.sql.Array;
 import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.Date;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
+import javax.annotation.Nonnull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.BaseClientManagedTimeIT;
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
+import org.apache.phoenix.schema.PMetaData;
+import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.util.ConfigUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
-import org.junit.AfterClass;
+import org.junit.Assert;
 
 import com.google.common.collect.ImmutableMap;
 
+/**
+ * 
+ * Base class that contains all the methods needed by
+ * client-time and hbase-time managed tests.
+ * 
+ * For tests needing connectivity to a cluster, please use
+ * {@link BaseHBaseManagedTimeIT} or {@link BaseClientManagedTimeIT}. 
+ * 
+ * In the rare case when a test can't share the same mini cluster as the 
+ * ones used by {@link BaseHBaseManagedTimeIT} or {@link BaseClientManagedTimeIT}
+ * one could extend this class and spin up your own mini cluster. Please 
+ * make sure to shutdown the mini cluster in a method annotated by @AfterClass.  
+ *
+ */
 public abstract class BaseTest {
     private static final Map<String,String> tableDDLMap;
+    private static Logger logger = Logger.getLogger("BaseTest.class");
+    
     static {
         ImmutableMap.Builder<String,String> builder = ImmutableMap.builder();
         builder.put(ENTITY_HISTORY_TABLE_NAME,"create table " + ENTITY_HISTORY_TABLE_NAME +
@@ -350,79 +430,160 @@ public abstract class BaseTest {
                 "    loc_id char(5))");
         tableDDLMap = builder.build();
     }
-
+    
     private static final String ORG_ID = "00D300000000XHP";
-
-    protected static String getOrganizationId() {
-        return ORG_ID;
+    private static final int NUM_SLAVES_BASE = 1;
+    
+    protected static String getZKClientPort(Configuration conf) {
+        return conf.get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
     }
-
-    private static long timestamp;
-
-    public static long nextTimestamp() {
-        timestamp += 100;
-        return timestamp;
+    
+    /**
+     * Set up the test hbase cluster. 
+     * @return url to be used by clients to connect to the cluster.
+     */
+    protected static String setUpTestCluster(@Nonnull Configuration conf) {
+        boolean isDistributedCluster = isDistributedClusterModeEnabled(conf);
+        if (!isDistributedCluster) {
+            return initMiniCluster(conf);
+       } else {
+            return initClusterDistributedMode(conf);
+        }
     }
-
-    protected static PhoenixTestDriver driver;
-    private static int driverRefCount = 0;
-
-    protected static synchronized PhoenixTestDriver initDriver(ReadOnlyProps props) throws Exception {
-        if (driver == null) {
-            if (driverRefCount == 0) {
-                BaseTest.driver = new PhoenixTestDriver(props);
-                DriverManager.registerDriver(driver);
-                driverRefCount++;
-            }
+    
+    private static boolean isDistributedClusterModeEnabled(Configuration conf) {
+        boolean isDistributedCluster = false;
+        //check if the distributed mode was specified as a system property.
+        isDistributedCluster = Boolean.parseBoolean(System.getProperty(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, "false"));
+        if (!isDistributedCluster) {
+           //fall back on hbase-default.xml or hbase-site.xml to check for distributed mode              
+           isDistributedCluster = conf.getBoolean(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, false);
         }
-        return BaseTest.driver;
+        return isDistributedCluster;
     }
+    
+    /**
+     * Initialize the mini cluster using phoenix-test specific configuration.
+     * @return url to be used by clients to connect to the mini cluster.
+     */
+    private static String initMiniCluster(Configuration conf) {
+        setUpConfigForMiniCluster(conf);
+        final HBaseTestingUtility utility = new HBaseTestingUtility(conf);
+        try {
+            utility.startMiniCluster();
+            String clientPort = utility.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
 
-    // We need to deregister an already existing driver in order
-    // to register a new one. We need to create a new one so that
-    // we register the new one with the new Configuration instance.
-    // Otherwise, we get connection errors because the prior one
-    // is no longer associated with the miniCluster.
-    protected static synchronized boolean destroyDriver() {
-        if (driver != null) {
-            driverRefCount--;
-            if (driverRefCount == 0) {
-                try {
+            // add shutdown hook to kill the mini cluster
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                @Override
+                public void run() {
                     try {
-                        driver.close();
-                        return true;
-                    } finally {
-                        try {
-                            DriverManager.deregisterDriver(driver);
-                        } finally {
-                            driver = null;
-                        }
+                        utility.shutdownMiniCluster();
+                    } catch (Exception e) {
+                        logger.log(Level.WARNING, "Exception caught when shutting down mini cluster: " + e.getMessage());
                     }
-                } catch (SQLException e) {
                 }
-            }
+            });
+            return JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort
+                    + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
+        } catch (Throwable t) {
+            throw new RuntimeException(t);
         }
-        return false;
+    }
+    
+    /**
+     * Initialize the cluster in distributed mode
+     * @return url to be used by clients to connect to the mini cluster.
+     */
+    private static String initClusterDistributedMode(Configuration conf) {
+        setTestConfigForDistribuedCluster(conf);
+        try {
+            IntegrationTestingUtility util =  new IntegrationTestingUtility(conf);
+            util.initializeCluster(NUM_SLAVES_BASE);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return JDBC_PROTOCOL + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
+    }
+
+    private static void setTestConfigForDistribuedCluster(Configuration conf) {
+        setDefaultTestConfig(conf);
+    }
+    
+    private static void setDefaultTestConfig(Configuration conf) {
+        ConfigUtil.setReplicationConfigIfAbsent(conf);
+        QueryServicesOptions options = QueryServicesTestImpl.getDefaultTestServicesOptions();
+        for (Entry<String,String> entry : options.getProps()) {
+            conf.set(entry.getKey(), entry.getValue());
+        }
+        //no point doing sanity checks when running tests.
+        conf.setBoolean("hbase.table.sanity.checks", false);
+    }
+    
+    public static Configuration setUpConfigForMiniCluster(Configuration conf) {
+        assertNotNull(conf);
+        setDefaultTestConfig(conf);
+        /*
+         * The default configuration of mini cluster ends up spawning a lot of threads
+         * that are not really needed by phoenix for test purposes. Limiting these threads
+         * helps us in running several mini clusters at the same time without hitting 
+         * the threads limit imposed by the OS. 
+         */
+        conf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 5);
+        conf.setInt(HConstants.REGION_SERVER_META_HANDLER_COUNT, 2);
+        conf.setInt(HConstants.MASTER_HANDLER_COUNT, 2);
+        conf.setInt("dfs.namenode.handler.count", 1);
+        conf.setInt("dfs.namenode.service.handler.count", 1);
+        conf.setInt("dfs.datanode.handler.count", 1);
+        conf.setInt("hadoop.http.max.threads", 1);
+        conf.setInt("ipc.server.read.threadpool.size", 2);
+        conf.setInt("ipc.server.handler.threadpool.size", 2);
+        conf.setInt("hbase.hconnection.threads.max", 2);
+        conf.setInt("hbase.hconnection.threads.core", 2);
+        conf.setInt("hbase.htable.threads.max", 2);
+        conf.setInt("hbase.regionserver.hlog.syncer.count", 2);
+        conf.setInt("hbase.hlog.asyncer.number", 2);
+        conf.setInt("hbase.assignment.zkevent.workers", 5);
+        conf.setInt("hbase.assignment.threads.max", 5);
+        return conf;
     }
 
-    protected static void startServer(String url, ReadOnlyProps props) throws Exception {
-        assertNull(BaseTest.driver);
-        // only load the test driver if we are testing locally - for integration tests, we want to
-        // test on a wider scale
-        if (PhoenixEmbeddedDriver.isTestUrl(url)) {
-            PhoenixTestDriver driver = initDriver(props);
-            assertTrue(DriverManager.getDriver(url) == driver);
-            driver.connect(url, TestUtil.TEST_PROPERTIES);
+    /**
+     * Create a {@link PhoenixTestDriver} and register it.
+     * @return an initialized and registered {@link PhoenixTestDriver} 
+     */
+    protected static PhoenixTestDriver initAndRegisterDriver(String url, ReadOnlyProps props) throws Exception {
+        PhoenixTestDriver driver = new PhoenixTestDriver(props);
+        DriverManager.registerDriver(driver);
+        Assert.assertTrue(DriverManager.getDriver(url) == driver);
+        driver.connect(url, TestUtil.TEST_PROPERTIES);
+        return driver;
+    }
+    
+    //Close and unregister the driver.
+    protected static boolean destroyDriver(PhoenixTestDriver driver) {
+        if (driver != null) {
+            try {
+                try {
+                    driver.close();
+                    return true;
+                } finally {
+                    DriverManager.deregisterDriver(driver);
+                }
+            } catch (Exception ignored) {}
         }
+        return false;
     }
     
-    protected static void startServer(String url) throws Exception {
-        startServer(url, ReadOnlyProps.EMPTY_PROPS);
+    protected static String getOrganizationId() {
+        return ORG_ID;
     }
 
-    @AfterClass
-    public static void stopServer() throws Exception {
-        assertTrue(destroyDriver());
+    private static long timestamp;
+
+    public static long nextTimestamp() {
+        timestamp += 100;
+        return timestamp;
     }
 
     protected static void ensureTableCreated(String url, String tableName) throws SQLException {
@@ -482,4 +643,673 @@ public abstract class BaseTest {
             conn.close();
         }
     }
+    
+    protected static byte[][] getDefaultSplits(String tenantId) {
+        return new byte[][] { 
+            Bytes.toBytes(tenantId + "00A"),
+            Bytes.toBytes(tenantId + "00B"),
+            Bytes.toBytes(tenantId + "00C"),
+            };
+    }
+    
+    protected static void deletePriorTables(long ts, String url) throws Exception {
+        deletePriorTables(ts, (String)null, url);
+    }
+    
+    protected static void deletePriorTables(long ts, String tenantId, String url) throws Exception {
+        Properties props = new Properties();
+        if (ts != HConstants.LATEST_TIMESTAMP) {
+            props.setProperty(CURRENT_SCN_ATTRIB, Long.toString(ts));
+        }
+        Connection conn = DriverManager.getConnection(url, props);
+        try {
+            deletePriorTables(ts, conn, url);
+            deletePriorSequences(ts, conn);
+        }
+        finally {
+            conn.close();
+        }
+    }
+    
+    private static void deletePriorTables(long ts, Connection globalConn, String url) throws Exception {
+        DatabaseMetaData dbmd = globalConn.getMetaData();
+        PMetaData cache = globalConn.unwrap(PhoenixConnection.class).getMetaDataCache();
+        cache.getTables();
+        // Drop VIEWs first, as we don't allow a TABLE with views to be dropped
+        // Tables are sorted by TENANT_ID
+        List<String[]> tableTypesList = Arrays.asList(new String[] {PTableType.VIEW.toString()}, new String[] {PTableType.TABLE.toString()});
+        for (String[] tableTypes: tableTypesList) {
+            ResultSet rs = dbmd.getTables(null, null, null, tableTypes);
+            String lastTenantId = null;
+            Connection conn = globalConn;
+            while (rs.next()) {
+                String fullTableName = SchemaUtil.getEscapedTableName(
+                        rs.getString(PhoenixDatabaseMetaData.TABLE_SCHEM),
+                        rs.getString(PhoenixDatabaseMetaData.TABLE_NAME));
+                String ddl = "DROP " + rs.getString(PhoenixDatabaseMetaData.TABLE_TYPE) + " " + fullTableName;
+                String tenantId = rs.getString(1);
+                if (tenantId != null && !tenantId.equals(lastTenantId))  {
+                    if (lastTenantId != null) {
+                        conn.close();
+                    }
+                    // Open tenant-specific connection when we find a new one
+                    Properties props = new Properties(globalConn.getClientInfo());
+                    props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+                    conn = DriverManager.getConnection(url, props);
+                    lastTenantId = tenantId;
+                }
+                try {
+                    conn.createStatement().executeUpdate(ddl);
+                } catch (NewerTableAlreadyExistsException ex) {
+                    logger.info("Newer table " + fullTableName + " or its delete marker exists. Ignore current deletion");
+                } catch (TableNotFoundException ex) {
+                    logger.info("Table " + fullTableName + " is already deleted.");
+                }
+            }
+            if (lastTenantId != null) {
+                conn.close();
+            }
+        }
+    }
+    
+    private static void deletePriorSequences(long ts, Connection conn) throws Exception {
+        // TODO: drop tenant-specific sequences too
+        ResultSet rs = conn.createStatement().executeQuery("SELECT " 
+                + PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + "," 
+                + PhoenixDatabaseMetaData.SEQUENCE_NAME 
+                + " FROM " + PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME);
+        while (rs.next()) {
+            try {
+                conn.createStatement().execute("DROP SEQUENCE " + SchemaUtil.getTableName(rs.getString(1), rs.getString(2)));
+            } catch (Exception e) {
+                //FIXME: see https://issues.apache.org/jira/browse/PHOENIX-973
+            }
+        }
+    }
+    
+    protected static void initSumDoubleValues(byte[][] splits, String url) throws Exception {
+        ensureTableCreated(url, "SumDoubleTest", splits);
+        Properties props = new Properties();
+        Connection conn = DriverManager.getConnection(url, props);
+        try {
+            // Insert all rows at ts
+            PreparedStatement stmt = conn.prepareStatement(
+                    "upsert into " +
+                    "SumDoubleTest(" +
+                    "    id, " +
+                    "    d, " +
+                    "    f, " +
+                    "    ud, " +
+                    "    uf) " +
+                    "VALUES (?, ?, ?, ?, ?)");
+            stmt.setString(1, "1");
+            stmt.setDouble(2, 0.001);
+            stmt.setFloat(3, 0.01f);
+            stmt.setDouble(4, 0.001);
+            stmt.setFloat(5, 0.01f);
+            stmt.execute();
+                
+            stmt.setString(1, "2");
+            stmt.setDouble(2, 0.002);
+            stmt.setFloat(3, 0.02f);
+            stmt.setDouble(4, 0.002);
+            stmt.setFloat(5, 0.02f);
+            stmt.execute();
+                
+            stmt.setString(1, "3");
+            stmt.setDouble(2, 0.003);
+            stmt.setFloat(3, 0.03f);
+            stmt.setDouble(4, 0.003);
+            stmt.setFloat(5, 0.03f);
+            stmt.execute();
+                
+            stmt.setString(1, "4");
+            stmt.setDouble(2, 0.004);
+            stmt.setFloat(3, 0.04f);
+            stmt.setDouble(4, 0.004);
+            stmt.setFloat(5, 0.04f);
+            stmt.execute();
+                
+            stmt.setString(1, "5");
+            stmt.setDouble(2, 0.005);
+            stmt.setFloat(3, 0.05f);
+            stmt.setDouble(4, 0.005);
+            stmt.setFloat(5, 0.05f);
+            stmt.execute();
+                
+            conn.commit();
+        } finally {
+            conn.close();
+        }
+    }
+    
+    protected static void initATableValues(String tenantId, byte[][] splits, String url) throws Exception {
+        initATableValues(tenantId, splits, null, url);
+    }
+    
+    protected static void initATableValues(String tenantId, byte[][] splits, Date date, String url) throws Exception {
+        initATableValues(tenantId, splits, date, null, url);
+    }
+    
+    
+    
+    protected static void initATableValues(String tenantId, byte[][] splits, Date date, Long ts, String url) throws Exception {
+        if (ts == null) {
+            ensureTableCreated(url, ATABLE_NAME, splits);
+        } else {
+            ensureTableCreated(url, ATABLE_NAME, splits, ts-5);
+        }
+        
+        Properties props = new Properties();
+        if (ts != null) {
+            props.setProperty(CURRENT_SCN_ATTRIB, Long.toString(ts-3));
+        }
+        Connection conn = DriverManager.getConnection(url, props);
+        try {
+            // Insert all rows at ts
+            PreparedStatement stmt = conn.prepareStatement(
+                    "upsert into " +
+                    "ATABLE(" +
+                    "    ORGANIZATION_ID, " +
+                    "    ENTITY_ID, " +
+                    "    A_STRING, " +
+                    "    B_STRING, " +
+                    "    A_INTEGER, " +
+                    "    A_DATE, " +
+                    "    X_DECIMAL, " +
+                    "    X_LONG, " +
+                    "    X_INTEGER," +
+                    "    Y_INTEGER," +
+                    "    A_BYTE," +
+                    "    A_SHORT," +
+                    "    A_FLOAT," +
+                    "    A_DOUBLE," +
+                    "    A_UNSIGNED_FLOAT," +
+                    "    A_UNSIGNED_DOUBLE)" +
+                    "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
+            stmt.setString(1, tenantId);
+            stmt.setString(2, ROW1);
+            stmt.setString(3, A_VALUE);
+            stmt.setString(4, B_VALUE);
+            stmt.setInt(5, 1);
+            stmt.setDate(6, date);
+            stmt.setBigDecimal(7, null);
+            stmt.setNull(8, Types.BIGINT);
+            stmt.setNull(9, Types.INTEGER);
+            stmt.setNull(10, Types.INTEGER);
+            stmt.setByte(11, (byte)1);
+            stmt.setShort(12, (short) 128);
+            stmt.setFloat(13, 0.01f);
+            stmt.setDouble(14, 0.0001);
+            stmt.setFloat(15, 0.01f);
+            stmt.setDouble(16, 0.0001);
+            stmt.execute();
+                
+            stmt.setString(1, tenantId);
+            stmt.setString(2, ROW2);
+            stmt.setString(3, A_VALUE);
+            stmt.setString(4, C_VALUE);
+            stmt.setInt(5, 2);
+            stmt.setDate(6, date == null ? null : new Date(date.getTime() + MILLIS_IN_DAY * 1));
+            stmt.setBigDecimal(7, null);
+            stmt.setNull(8, Types.BIGINT);
+            stmt.setNull(9, Types.INTEGER);
+            stmt.setNull(10, Types.INTEGER);
+            stmt.setByte(11, (byte)2);
+            stmt.setShort(12, (short) 129);
+            stmt.setFloat(13, 0.02f);
+            stmt.setDouble(14, 0.0002);
+            stmt.setFloat(15, 0.02f);
+            stmt.setDouble(16, 0.0002);
+            stmt.execute();
+                
+            stmt.setString(1, tenantId);
+            stmt.setString(2, ROW3);
+            stmt.setString(3, A_VALUE);
+            stmt.setString(4, E_VALUE);
+            stmt.setInt(5, 3);
+            stmt.setDate(6, date == null ? null : new Date(date.getTime() + MILLIS_IN_DAY * 2));
+            stmt.setBigDecimal(7, null);
+            stmt.setNull(8, Types.BIGINT);
+            stmt.setNull(9, Types.INTEGER);
+            stmt.setNull(10, Types.INTEGER);
+            stmt.setByte(11, (byte)3);
+            stmt.setShort(12, (short) 130);
+            stmt.setFloat(13, 0.03f);
+            stmt.setDouble(14, 0.0003);
+            stmt.setFloat(15, 0.03f);
+            stmt.setDouble(16, 0.0003);
+            stmt.execute();
+                
+            stmt.setString(1, tenantId);
+            stmt.setString(2, ROW4);
+            stmt.setString(3, A_VALUE);
+            stmt.setString(4, B_VALUE);
+            stmt.setInt(5, 4);
+            stmt.setDate(6, date == null ? null : date);
+            stmt.setBigDecimal(7, null);
+            stmt.setNull(8, Types.BIGINT);
+            stmt.setNull(9, Types.INTEGER);
+            stmt.setNull(10, Types.INTEGER);
+            stmt.setByte(11, (byte)4);
+            stmt.setShort(12, (short) 131);
+            stmt.setFloat(13, 0.04f);
+            stmt.setDouble(14, 0.0004);
+            stmt.setFloat(15, 0.04f);
+            stmt.setDouble(16, 0.0004);
+            stmt.execute();
+                
+            stmt.setString(1, tenantId);
+            stmt.setString(2, ROW5);
+            stmt.setString(3, B_VALUE);
+            stmt.setString(4, C_VALUE);
+            stmt.setInt(5, 5);
+            stmt.setDate(6, date == null ? null : new Date(date.getTime() + MILLIS_IN_DAY * 1));
+            stmt.setBigDecimal(7, null);
+            stmt.setNull(8, Types.BIGINT);
+            stmt.setNull(9, Types.INTEGER);
+            stmt.setNull(10, Types.INTEGER);
+            stmt.setByte(11, (byte)5);
+            stmt.setShort(12, (short) 132);
+            stmt.setFloat(13, 0.05f);
+            stmt.setDouble(14, 0.0005);
+            stmt.setFloat(15, 0.05f);
+            stmt.setDouble(16, 0.0005);
+            stmt.execute();
+                
+            stmt.setString(1, tenantId);
+            stmt.setString(2, ROW6);
+            stmt.setString(3, B_VALUE);
+            stmt.setString(4, E_VALUE);
+            stmt.setInt(5, 6);
+            stmt.setDate(6, date == null ? null : new Date(date.getTime() + MILLIS_IN_DAY * 2));
+            stmt.setBigDecimal(7, null);
+            stmt.setNull(8, Types.BIGINT);
+            stmt.setNull(9, Types.INTEGER);
+            stmt.setNull(10, Types.INTEGER);
+            stmt.setByte(11, (byte)6);
+            stmt.setShort(12, (short) 133);
+            stmt.setFloat(13, 0.06f);
+            stmt.setDouble(14, 0.0006);
+            stmt.setFloat(15, 0.06f);
+            stmt.setDouble(16, 0.0006);
+            stmt.execute();
+                
+            stmt.setString(1, tenantId);
+            stmt.setString(2, ROW7);
+            stmt.setString(3, B_VALUE);
+            stmt.setString(4, B_VALUE);
+            stmt.setInt(5, 7);
+            stmt.setDate(6, date == null ? null : date);
+            stmt.setBigDecimal(7, BigDecimal.valueOf(0.1));
+            stmt.setLong(8, 5L);
+            stmt.setInt(9, 5);
+            stmt.setNull(10, Types.INTEGER);
+            stmt.setByte(11, (byte)7);
+            stmt.setShort(12, (short) 134);
+            stmt.setFloat(13, 0.07f);
+            stmt.setDouble(14, 0.0007);
+            stmt.setFloat(15, 0.07f);
+            stmt.setDouble(16, 0.0007);
+            stmt.execute();
+                
+            stmt.setString(1, tenantId);
+            stmt.setString(2, ROW8);
+            stmt.setString(3, B_VALUE);
+            stmt.setString(4, C_VALUE);
+            stmt.setInt(5, 8);
+            stmt.setDate(6, date == null ? null : new Date(date.getTime() + MILLIS_IN_DAY * 1));
+            stmt.setBigDecimal(7, BigDecimal.valueOf(3.9));
+            long l = Integer.MIN_VALUE - 1L;
+            assert(l < Integer.MIN_VALUE);
+            stmt.setLong(8, l);
+            stmt.setInt(9, 4);
+            stmt.setNull(10, Types.INTEGER);
+            stmt.setByte(11, (byte)8);
+            stmt.setShort(12, (short) 135);
+            stmt.setFloat(13, 0.08f);
+            stmt.setDouble(14, 0.0008);
+            stmt.setFloat(15, 0.08f);
+            stmt.setDouble(16, 0.0008);
+            stmt.execute();
+                
+            stmt.setString(1, tenantId);
+            stmt.setString(2, ROW9);
+            stmt.setString(3, C_VALUE);
+            stmt.setString(4, E_VALUE);
+            stmt.setInt(5, 9);
+            stmt.setDate(6, date == null ? null : new Date(date.getTime() + MILLIS_IN_DAY * 2));
+            stmt.setBigDecimal(7, BigDecimal.valueOf(3.3));
+            l = Integer.MAX_VALUE + 1L;
+            assert(l > Integer.MAX_VALUE);
+            stmt.setLong(8, l);
+            stmt.setInt(9, 3);
+            stmt.setInt(10, 300);
+            stmt.setByte(11, (byte)9);
+            stmt.setShort(12, (short) 0);
+            stmt.setFloat(13, 0.09f);
+            stmt.setDouble(14, 0.0009);
+            stmt.setFloat(15, 0.09f);
+            stmt.setDouble(16, 0.0009);
+            stmt.execute();
+                
+            conn.commit();
+        } finally {
+            conn.close();
+        }
+    }
+    
+    protected static void initTablesWithArrays(String tenantId, Date date, Long ts, boolean useNull, String url) throws Exception {
+        Properties props = new Properties();
+        if (ts != null) {
+            props.setProperty(CURRENT_SCN_ATTRIB, ts.toString());
+        }
+        Connection conn = DriverManager.getConnection(url, props);
+        try {
+            // Insert all rows at ts
+            PreparedStatement stmt = conn.prepareStatement(
+                    "upsert into " +
+                            "TABLE_WITH_ARRAY(" +
+                            "    ORGANIZATION_ID, " +
+                            "    ENTITY_ID, " +
+                            "    a_string_array, " +
+                            "    B_STRING, " +
+                            "    A_INTEGER, " +
+                            "    A_DATE, " +
+                            "    X_DECIMAL, " +
+                            "    x_long_array, " +
+                            "    X_INTEGER," +
+                            "    a_byte_array," +
+                            "    A_SHORT," +
+                            "    A_FLOAT," +
+                            "    a_double_array," +
+                            "    A_UNSIGNED_FLOAT," +
+                            "    A_UNSIGNED_DOUBLE)" +
+                    "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
+            stmt.setString(1, tenantId);
+            stmt.setString(2, ROW1);
+            // Need to support primitive
+            String[] strArr =  new String[4];
+            strArr[0] = "ABC";
+            if (useNull) {
+                strArr[1] = null;
+            } else {
+                strArr[1] = "CEDF";
+            }
+            strArr[2] = "XYZWER";
+            strArr[3] = "AB";
+            Array array = conn.createArrayOf("VARCHAR", strArr);
+            stmt.setArray(3, array);
+            stmt.setString(4, B_VALUE);
+            stmt.setInt(5, 1);
+            stmt.setDate(6, date);
+            stmt.setBigDecimal(7, null);
+            // Need to support primitive
+            Long[] longArr =  new Long[2];
+            longArr[0] = 25l;
+            longArr[1] = 36l;
+            array = conn.createArrayOf("BIGINT", longArr);
+            stmt.setArray(8, array);
+            stmt.setNull(9, Types.INTEGER);
+            // Need to support primitive
+            Byte[] byteArr =  new Byte[2];
+            byteArr[0] = 25;
+            byteArr[1] = 36;
+            array = conn.createArrayOf("TINYINT", byteArr);
+            stmt.setArray(10, array);
+            stmt.setShort(11, (short) 128);
+            stmt.setFloat(12, 0.01f);
+            // Need to support primitive
+            Double[] doubleArr =  new Double[4];
+            doubleArr[0] = 25.343;
+            doubleArr[1] = 36.763;
+            doubleArr[2] = 37.56;
+            doubleArr[3] = 386.63;
+            array = conn.createArrayOf("DOUBLE", doubleArr);
+            stmt.setArray(13, array);
+            stmt.setFloat(14, 0.01f);
+            stmt.setDouble(15, 0.0001);
+            stmt.execute();
+
+            conn.commit();
+        } finally {
+            conn.close();
+        }
+    }
+    
+    protected static void initEntityHistoryTableValues(String tenantId, byte[][] splits, String url) throws Exception {
+        initEntityHistoryTableValues(tenantId, splits, null);
+    }
+    
+    protected static void initEntityHistoryTableValues(String tenantId, byte[][] splits, Date date, String url) throws Exception {
+        initEntityHistoryTableValues(tenantId, splits, date, null);
+    }
+    
+    protected static void initEntityHistoryTableValues(String tenantId, byte[][] splits, Date date, Long ts, String url) throws Exception {
+        if (ts == null) {
+            ensureTableCreated(url, ENTITY_HISTORY_TABLE_NAME, splits);
+        } else {
+            ensureTableCreated(url, ENTITY_HISTORY_TABLE_NAME, splits, ts-2);
+        }
+        
+        Properties props = new Properties();
+        if (ts != null) {
+            props.setProperty(CURRENT_SCN_ATTRIB, ts.toString());
+        }
+        Connection conn = DriverManager.getConnection(url, props);
+        try {
+            // Insert all rows at ts
+            PreparedStatement stmt = conn.prepareStatement(
+                    "upsert into " +
+                    ENTITY_HISTORY_TABLE_NAME+
+                    "(" +
+                    "    ORGANIZATION_ID, " +
+                    "    PARENT_ID, " +
+                    "    CREATED_DATE, " +
+                    "    ENTITY_HISTORY_ID, " +
+                    "    OLD_VALUE, " +
+                    "    NEW_VALUE) " +
+                    "VALUES (?, ?, ?, ?, ?, ?)");
+            stmt.setString(1, tenantId);
+            stmt.setString(2, PARENTID1);
+            stmt.setDate(3, date);
+            stmt.setString(4, ENTITYHISTID1);
+            stmt.setString(5,  A_VALUE);
+            stmt.setString(6,  B_VALUE);
+            stmt.execute();
+                
+            stmt.setString(1, tenantId);
+            stmt.setString(2, PARENTID2);
+            stmt.setDate(3, date);
+            stmt.setString(4, ENTITYHISTID2);
+            stmt.setString(5,  A_VALUE);
+            stmt.setString(6,  B_VALUE);
+            stmt.execute();
+            
+                
+            stmt.setString(1, tenantId);
+            stmt.setString(2, PARENTID3);
+            stmt.setDate(3, date);
+            stmt.setString(4, ENTITYHISTID3);
+            stmt.setString(5,  A_VALUE);
+            stmt.setString(6,  B_VALUE);
+            stmt.execute();
+            
+            stmt.setString(1, tenantId);
+            stmt.setString(2, PARENTID4);
+            stmt.setDate(3, date);
+            stmt.setString(4, ENTITYHISTID4);
+            stmt.setString(5,  A_VALUE);
+            stmt.setString(6,  B_VALUE);
+            stmt.execute();
+            
+            stmt.setString(1, tenantId);
+            stmt.setString(2, PARENTID5);
+            stmt.setDate(3, date);
+            stmt.setString(4, ENTITYHISTID5);
+            stmt.setString(5,  A_VALUE);
+            stmt.setString(6,  B_VALUE);
+            stmt.execute();
+            
+            stmt.setString(1, tenantId);
+            stmt.setString(2, PARENTID6);
+            stmt.setDate(3, date);
+            stmt.setString(4, ENTITYHISTID6);
+            stmt.setString(5,  A_VALUE);
+            stmt.setString(6,  B_VALUE);
+            stmt.execute();
+            
+            stmt.setString(1, tenantId);
+            stmt.setString(2, PARENTID7);
+            stmt.setDate(3, date);
+            stmt.setString(4, ENTITYHISTID7);
+            stmt.setString(5,  A_VALUE);
+            stmt.setString(6,  B_VALUE);
+            stmt.execute();
+            
+            stmt.setString(1, tenantId);
+            stmt.setString(2, PARENTID8);
+            stmt.setDate(3, date);
+            stmt.setString(4, ENTITYHISTID8);
+            stmt.setString(5,  A_VALUE);
+            stmt.setString(6,  B_VALUE);
+            stmt.execute();
+            
+            stmt.setString(1, tenantId);
+            stmt.setString(2, PARENTID9);
+            stmt.setDate(3, date);
+            stmt.setString(4, ENTITYHISTID9);
+            stmt.setString(5,  A_VALUE);
+            stmt.setString(6,  B_VALUE);
+            stmt.execute();
+            
+            conn.commit();
+        } finally {
+            conn.close();
+        }
+    }
+    
+    protected static void initSaltedEntityHistoryTableValues(String tenantId, byte[][] splits, Date date, Long ts, String url) throws Exception {
+        if (ts == null) {
+            ensureTableCreated(url, ENTITY_HISTORY_SALTED_TABLE_NAME, splits);
+        } else {
+            ensureTableCreated(url, ENTITY_HISTORY_SALTED_TABLE_NAME, splits, ts-2);
+        }
+        
+        Properties props = new Properties();
+        if (ts != null) {
+            props.setProperty(CURRENT_SCN_ATTRIB, ts.toString());
+        }
+        Connection conn = DriverManager.getConnection(url, props);
+        try {
+            // Insert all rows at ts
+            PreparedStatement stmt = conn.prepareStatement(
+                    "upsert into " +
+                    ENTITY_HISTORY_SALTED_TABLE_NAME+
+                    "(" +
+                    "    ORGANIZATION_ID, " +
+                    "    PARENT_ID, " +
+                    "    CREATED_DATE, " +
+                    "    ENTITY_HISTORY_ID, " +
+                    "    OLD_VALUE, " +
+                    "    NEW_VALUE) " +
+                    "VALUES (?, ?, ?, ?, ?, ?)");
+            stmt.setString(1, tenantId);
+            stmt.setString(2, PARENTID1);
+            stmt.setDate(3, date);
+            stmt.setString(4, ENTITYHISTID1);
+            stmt.setString(5,  A_VALUE);
+            stmt.setString(6,  B_VALUE);
+            stmt.execute();
+                
+            stmt.setString(1, tenantId);
+            stmt.setString(2, PARENTID2);
+            stmt.setDate(3, date);
+            stmt.setString(4, ENTITYHISTID2);
+            stmt.setString(5,  A_VALUE);
+            stmt.setString(6,  B_VALUE);
+            stmt.execute();
+            
+                
+            stmt.setString(1, tenantId);
+            stmt.setString(2, PARENTID3);
+            stmt.setDate(3, date);
+            stmt.setString(4, ENTITYHISTID3);
+            stmt.setString(5,  A_VALUE);
+            stmt.setString(6,  B_VALUE);
+            stmt.execute();
+            
+            stmt.setString(1, tenantId);
+            stmt.setString(2, PARENTID4);
+            stmt.setDate(3, date);
+            stmt.setString(4, ENTITYHISTID4);
+            stmt.setString(5,  A_VALUE);
+            stmt.setString(6,  B_VALUE);
+            stmt.execute();
+            
+            stmt.setString(1, tenantId);
+            stmt.setString(2, PARENTID5);
+            stmt.setDate(3, date);
+            stmt.setString(4, ENTITYHISTID5);
+            stmt.setString(5,  A_VALUE);
+            stmt.setString(6,  B_VALUE);
+            stmt.execute();
+            
+            stmt.setString(1, tenantId);
+            stmt.setString(2, PARENTID6);
+            stmt.setDate(3, date);
+            stmt.setString(4, ENTITYHISTID6);
+            stmt.setString(5,  A_VALUE);
+            stmt.setString(6,  B_VALUE);
+            stmt.execute();
+            
+            stmt.setString(1, tenantId);
+            stmt.setString(2, PARENTID7);
+            stmt.setDate(3, date);
+            stmt.setString(4, ENTITYHISTID7);
+            stmt.setString(5,  A_VALUE);
+            stmt.setString(6,  B_VALUE);
+            stmt.execute();
+            
+            stmt.setString(1, tenantId);
+            stmt.setString(2, PARENTID8);
+            stmt.setDate(3, date);
+            stmt.setString(4, ENTITYHISTID8);
+            stmt.setString(5,  A_VALUE);
+            stmt.setString(6,  B_VALUE);
+            stmt.execute();
+            
+            stmt.setString(1, tenantId);
+            stmt.setString(2, PARENTID9);
+            stmt.setDate(3, date);
+            stmt.setString(4, ENTITYHISTID9);
+            stmt.setString(5,  A_VALUE);
+            stmt.setString(6,  B_VALUE);
+            stmt.execute();
+            
+            conn.commit();
+        } finally {
+            conn.close();
+        }
+    }
+    
+    /**
+     * Disable and drop all the tables except SYSTEM.CATALOG and SYSTEM.SEQUENCE
+     */
+    protected static void disableAndDropNonSystemTables(PhoenixTestDriver driver) throws Exception {
+        HBaseAdmin admin = driver.getConnectionQueryServices(null, null).getAdmin();
+        try {
+            HTableDescriptor[] tables = admin.listTables();
+            for (HTableDescriptor table : tables) {
+                boolean isCatalogTable = (Bytes.compareTo(table.getName(), PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES) == 0);
+                boolean isSequenceTable = (Bytes.compareTo(table.getName(), PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES) == 0);
+                if (!isCatalogTable && !isSequenceTable) {
+                    admin.disableTable(table.getName());
+                    admin.deleteTable(table.getName());
+                }
+            }    
+        } finally {
+            admin.close();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/91b7922c/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index 090b675..72e7aef 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -57,8 +57,8 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
         this(ReadOnlyProps.EMPTY_PROPS);
     }
     
-    public QueryServicesTestImpl(ReadOnlyProps overrideProps) {
-        super(withDefaults()
+    public static QueryServicesOptions getDefaultTestServicesOptions() {
+    	return withDefaults()
                 .setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE)
                 .setQueueSize(DEFAULT_QUEUE_SIZE)
                 .setMaxMemoryPerc(DEFAULT_MAX_MEMORY_PERC)
@@ -78,8 +78,10 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
                 .setWALEditCodec(DEFAULT_WAL_EDIT_CODEC)
                 .setDropMetaData(DEFAULT_DROP_METADATA)
                 .setMaxClientMetaDataCacheSize(DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE)
-                .setMaxServerMetaDataCacheSize(DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE)
-                .setAll(overrideProps)
-        );
+                .setMaxServerMetaDataCacheSize(DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE);
+    }
+    
+    public QueryServicesTestImpl(ReadOnlyProps overrideProps) {
+        super(getDefaultTestServicesOptions().setAll(overrideProps));
     }    
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/91b7922c/phoenix-flume/src/it/java/org/apache/phoenix/flume/PhoenixSinkIT.java
----------------------------------------------------------------------
diff --git a/phoenix-flume/src/it/java/org/apache/phoenix/flume/PhoenixSinkIT.java b/phoenix-flume/src/it/java/org/apache/phoenix/flume/PhoenixSinkIT.java
index fce9a9d..a4b3655 100644
--- a/phoenix-flume/src/it/java/org/apache/phoenix/flume/PhoenixSinkIT.java
+++ b/phoenix-flume/src/it/java/org/apache/phoenix/flume/PhoenixSinkIT.java
@@ -30,13 +30,15 @@ import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.sink.DefaultSinkFactory;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
-import org.junit.Assert;
-import org.junit.Test;
-
+import org.apache.phoenix.end2end.HBaseManagedTimeTest;
 import org.apache.phoenix.flume.serializer.EventSerializers;
 import org.apache.phoenix.flume.sink.PhoenixSink;
 import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
+@Category(HBaseManagedTimeTest.class)
 public class PhoenixSinkIT extends BaseHBaseManagedTimeIT {
 
     private Context sinkContext;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/91b7922c/phoenix-flume/src/it/java/org/apache/phoenix/flume/RegexEventSerializerIT.java
----------------------------------------------------------------------
diff --git a/phoenix-flume/src/it/java/org/apache/phoenix/flume/RegexEventSerializerIT.java b/phoenix-flume/src/it/java/org/apache/phoenix/flume/RegexEventSerializerIT.java
index 33349a0..7f4a6af 100644
--- a/phoenix-flume/src/it/java/org/apache/phoenix/flume/RegexEventSerializerIT.java
+++ b/phoenix-flume/src/it/java/org/apache/phoenix/flume/RegexEventSerializerIT.java
@@ -40,15 +40,18 @@ import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.HBaseManagedTimeTest;
+import org.apache.phoenix.flume.serializer.EventSerializers;
+import org.apache.phoenix.flume.sink.PhoenixSink;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
-import org.apache.phoenix.flume.serializer.EventSerializers;
-import org.apache.phoenix.flume.sink.PhoenixSink;
 
+@Category(HBaseManagedTimeTest.class)
 public class RegexEventSerializerIT extends BaseHBaseManagedTimeIT {
 
     private Context sinkContext;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/91b7922c/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
index 2bb47c8..28afb9a 100644
--- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
+++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
@@ -19,6 +19,7 @@
  */
 package org.apache.phoenix.pig;
 
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -36,9 +37,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.ConfigUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
@@ -56,6 +57,7 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import com.google.common.base.Preconditions;
 
@@ -63,6 +65,7 @@ import com.google.common.base.Preconditions;
  * 
  * Test class to run all the integration tests against a virtual map reduce cluster.
  */
+@Category(NeedsOwnMiniClusterTest.class)
 public class PhoenixHBaseLoaderIT {
     
     private static final Log LOG = LogFactory.getLog(PhoenixHBaseLoaderIT.class);
@@ -79,10 +82,8 @@ public class PhoenixHBaseLoaderIT {
     public static void setUpBeforeClass() throws Exception {
         hbaseTestUtil = new HBaseTestingUtility();
         conf = hbaseTestUtil.getConfiguration();
-        ConfigUtil.setReplicationConfigIfAbsent(conf);
+        setUpConfigForMiniCluster(conf);
         conf.set(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
-        conf.setInt(QueryServices.MASTER_INFO_PORT_ATTRIB, -1);
-        conf.setInt(QueryServices.REGIONSERVER_INFO_PORT_ATTRIB, -1);
         hbaseTestUtil.startMiniCluster();
 
         Class.forName(PhoenixDriver.class.getName());
@@ -439,4 +440,4 @@ public class PhoenixHBaseLoaderIT {
         conn.close();
         hbaseTestUtil.shutdownMiniCluster();
     }
-}
+}
\ No newline at end of file