You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2023/11/06 07:47:14 UTC

(phoenix) branch master updated: PHOENIX-6523 Support for HBase Registry Implementations through Phoenix connection URL

This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 09d2299706 PHOENIX-6523 Support for HBase Registry Implementations through Phoenix connection URL
09d2299706 is described below

commit 09d229970627eb9b0a0b2ad565f10ba4d3bfeb9c
Author: Istvan Toth <st...@apache.org>
AuthorDate: Thu Aug 3 11:17:35 2023 +0200

    PHOENIX-6523 Support for HBase Registry Implementations through Phoenix connection URL
    
    Co-authored-by: Ramie <ra...@salesforce.com>
---
 .../org/apache/phoenix/end2end/ConnectionIT.java   | 156 ++++++
 .../end2end/ConnectionQueryServicesTestImpl.java   |   2 +-
 .../apache/phoenix/end2end/PhoenixTTLToolIT.java   |  43 +-
 .../SystemTablesCreationOnConnectionIT.java        |   8 +-
 .../phoenix/end2end/SystemTablesUpgradeIT.java     |   4 +-
 .../end2end/index/MutableIndexFailureIT.java       |   7 +-
 .../transform/TransformMonitorExtendedIT.java      |  19 +
 .../phoenix/jdbc/FailoverPhoenixConnectionIT.java  |  18 +-
 .../phoenix/jdbc/HighAvailabilityGroupIT.java      |   2 +-
 .../phoenix/jdbc/HighAvailabilityGroupTestIT.java  |   8 +-
 .../jdbc/HighAvailabilityTestingUtility.java       |  10 +-
 .../phoenix/jdbc/ParallelPhoenixConnectionIT.java  |   2 +
 .../phoenix/jdbc/SecureUserConnectionsIT.java      |  41 +-
 .../monitoring/PhoenixTableLevelMetricsIT.java     |  10 +-
 .../replication/SystemCatalogWALEntryFilterIT.java |  14 +-
 .../phoenix/jdbc/AbstractRPCConnectionInfo.java    | 263 ++++++++++
 .../org/apache/phoenix/jdbc/ClusterRoleRecord.java |   1 +
 .../org/apache/phoenix/jdbc/ConnectionInfo.java    | 545 ++++++++++++++++++++
 .../apache/phoenix/jdbc/HighAvailabilityGroup.java |  21 +-
 .../apache/phoenix/jdbc/MasterConnectionInfo.java  | 103 ++++
 .../phoenix/jdbc/ParallelPhoenixConnection.java    |   8 +-
 .../phoenix/jdbc/ParallelPhoenixContext.java       |   1 +
 .../org/apache/phoenix/jdbc/PhoenixDriver.java     |  25 +-
 .../apache/phoenix/jdbc/PhoenixEmbeddedDriver.java | 568 ++-------------------
 .../apache/phoenix/jdbc/PhoenixHAAdminTool.java    |   3 +-
 .../org/apache/phoenix/jdbc/RPCConnectionInfo.java | 188 +++++++
 .../org/apache/phoenix/jdbc/ZKConnectionInfo.java  | 341 +++++++++++++
 .../phoenix/mapreduce/AbstractBulkLoadTool.java    |   6 +-
 .../phoenix/mapreduce/util/ConnectionUtil.java     |  77 ++-
 .../mapreduce/util/PhoenixConfigurationUtil.java   |  48 +-
 .../phoenix/query/ConnectionQueryServicesImpl.java |   2 +-
 .../query/ConnectionlessQueryServicesImpl.java     |   2 +-
 .../org/apache/phoenix/query/QueryServices.java    |   3 +
 .../java/org/apache/phoenix/trace/TraceReader.java |   1 -
 .../apache/phoenix/trace/TraceSpanReceiver.java    |   2 -
 .../NotAvailableTransactionProvider.java           |   2 +-
 .../transaction/OmidTransactionProvider.java       |   2 +-
 .../transaction/PhoenixTransactionProvider.java    |   2 +-
 .../java/org/apache/phoenix/util/JDBCUtil.java     |  49 +-
 .../org/apache/phoenix/util/PhoenixRuntime.java    |  98 ++--
 .../java/org/apache/phoenix/util/QueryUtil.java    |  46 +-
 .../apache/phoenix/jdbc/ClusterRoleRecordTest.java |   4 +-
 .../jdbc/ParallelPhoenixConnectionFailureTest.java |   8 +-
 .../phoenix/jdbc/PhoenixEmbeddedDriverTest.java    | 534 ++++++++++++++++---
 .../org/apache/phoenix/jdbc/PhoenixTestDriver.java |  10 +-
 .../mapreduce/PhoenixMultiViewInputFormatTest.java |   5 +
 .../util/PhoenixConfigurationUtilTest.java         |  24 +-
 .../transaction/OmidTransactionService.java        |   2 +-
 .../transaction/TransactionServiceManager.java     |   2 +-
 .../java/org/apache/phoenix/util/JDBCUtilTest.java |   8 +-
 .../org/apache/phoenix/util/QueryUtilTest.java     |  16 +-
 51 files changed, 2517 insertions(+), 847 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionIT.java
new file mode 100644
index 0000000000..243b3c730a
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionIT.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.util.InstanceResolver;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ConnectionIT {
+
+    private static HBaseTestingUtility hbaseTestUtil;
+    private static Configuration conf;
+
+    private static int tableCounter;
+
+    @BeforeClass
+    public static synchronized void setUp() throws Exception {
+        hbaseTestUtil = new HBaseTestingUtility();
+        conf = hbaseTestUtil.getConfiguration();
+        setUpConfigForMiniCluster(conf);
+        conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase-test");
+        hbaseTestUtil.startMiniCluster();
+        Class.forName(PhoenixDriver.class.getName());
+        InstanceResolver.clearSingletons();
+        // Make sure the ConnectionInfo doesn't try to pull a default Configuration
+        InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() {
+            @Override
+            public Configuration getConfiguration() {
+                return new Configuration(conf);
+            }
+
+            @Override
+            public Configuration getConfiguration(Configuration confToClone) {
+                Configuration copy = new Configuration(conf);
+                copy.addResource(confToClone);
+                return copy;
+            }
+        });
+    }
+
+    @AfterClass
+    public static synchronized void cleanUp() throws Exception {
+        InstanceResolver.clearSingletons();
+    }
+
+    @Test
+    public void testInputAndOutputConnections() throws SQLException {
+        try (Connection inputConnection = ConnectionUtil.getInputConnection(conf)) {
+            smoke(inputConnection);
+        }
+        try (Connection outputConnection = ConnectionUtil.getOutputConnection(conf)) {
+            smoke(outputConnection);
+        }
+    }
+
+    private void smoke(Connection conn) throws SQLException {
+        String table = "t" + tableCounter++;
+        try (Statement stmt = conn.createStatement()) {
+            stmt.execute("create table " + table + " (a integer primary key,b varchar)");
+            stmt.execute("upsert into " + table + " values(1,'foo')");
+            conn.commit();
+            ResultSet rs = stmt.executeQuery("select count(*) from " + table);
+            rs.next();
+            assertEquals(1, rs.getInt(1));
+        }
+    }
+
+    @Test
+    public void testZkConnections() throws SQLException {
+        String zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
+        String zkPort = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
+        try (PhoenixConnection conn1 =
+                (PhoenixConnection) DriverManager.getConnection("jdbc:phoenix");
+                PhoenixConnection conn2 =
+                        (PhoenixConnection) DriverManager.getConnection("jdbc:phoenix+zk");
+                PhoenixConnection conn3 =
+                        (PhoenixConnection) DriverManager
+                                .getConnection("jdbc:phoenix+zk:" + zkQuorum + ":" + zkPort);) {
+            smoke(conn1);
+            smoke(conn2);
+            smoke(conn3);
+            assertEquals(conn1.getQueryServices(), conn2.getQueryServices());
+            assertEquals(conn1.getQueryServices(), conn3.getQueryServices());
+        }
+    }
+
+    @Test
+    public void testMasterConnections() throws SQLException {
+        assumeTrue(VersionInfo.compareVersion(VersionInfo.getVersion(), "2.3.0") >= 0);
+        int masterPortString = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
+        String masterHosts = conf.get(HConstants.MASTER_ADDRS_KEY);
+        try (PhoenixConnection conn1 =
+                (PhoenixConnection) DriverManager.getConnection("jdbc:phoenix+master");
+                PhoenixConnection conn2 =
+                        (PhoenixConnection) DriverManager.getConnection("jdbc:phoenix+master:"
+                                + masterHosts.replaceAll(":", "\\\\:") + ":" + masterPortString);) {
+            smoke(conn1);
+            smoke(conn2);
+            assertEquals(conn1.getQueryServices(), conn2.getQueryServices());
+        }
+    }
+
+    @Test
+    public void testRPCConnections() throws SQLException {
+        assumeTrue(VersionInfo.compareVersion(VersionInfo.getVersion(), "2.5.0") >= 0);
+        String masterHosts = conf.get(HConstants.MASTER_ADDRS_KEY);
+
+        try (PhoenixConnection conn1 =
+                (PhoenixConnection) DriverManager.getConnection("jdbc:phoenix+rpc");
+                PhoenixConnection conn2 =
+                        (PhoenixConnection) DriverManager.getConnection(
+                            "jdbc:phoenix+rpc:" + masterHosts.replaceAll(":", "\\\\:"));) {
+            smoke(conn1);
+            smoke(conn2);
+            assertEquals(conn1.getQueryServices(), conn2.getQueryServices());
+        }
+
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
index e342f50354..f8b46905d2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
@@ -29,8 +29,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.protobuf.RpcController;
+import org.apache.phoenix.jdbc.ConnectionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.query.ConnectionQueryServicesImpl;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.transaction.PhoenixTransactionClient;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixTTLToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixTTLToolIT.java
index 7b16c145cc..58282ef6ce 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixTTLToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixTTLToolIT.java
@@ -17,21 +17,31 @@
  */
 package org.apache.phoenix.end2end;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CompareOperator;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.RowFilter;
 import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.mapreduce.PhoenixTTLTool;
 import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.ConfigurationFactory;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.InstanceResolver;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
@@ -39,15 +49,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 @Category(NeedsOwnMiniClusterTest.class)
 public class PhoenixTTLToolIT extends ParallelStatsDisabledIT {
 
@@ -58,6 +59,22 @@ public class PhoenixTTLToolIT extends ParallelStatsDisabledIT {
         props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(false));
         props.put(QueryServices.PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED, Boolean.toString(true));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+
+        InstanceResolver.clearSingletons();
+        // Make sure the ConnectionInfo in the tool doesn't try to pull a default Configuration
+        InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() {
+            @Override
+            public Configuration getConfiguration() {
+                return new Configuration(config);
+            }
+
+            @Override
+            public Configuration getConfiguration(Configuration confToClone) {
+                Configuration copy = new Configuration(config);
+                copy.addResource(confToClone);
+                return copy;
+            }
+        });
     }
 
     private final long PHOENIX_TTL_EXPIRE_IN_A_SECOND = 1;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
index a1f1892982..4c002cfaaa 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
@@ -55,9 +55,9 @@ import org.apache.phoenix.compat.hbase.CompatUtil;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.UpgradeRequiredException;
+import org.apache.phoenix.jdbc.ConnectionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDriver;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
 import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.ConnectionQueryServices;
@@ -112,7 +112,7 @@ public class SystemTablesCreationOnConnectionIT {
     private static class PhoenixSysCatCreationServices extends ConnectionQueryServicesImpl {
 
         PhoenixSysCatCreationServices(QueryServices services,
-                                      PhoenixEmbeddedDriver.ConnectionInfo connectionInfo, Properties info) {
+                                      ConnectionInfo connectionInfo, Properties info) {
             super(services, connectionInfo, info);
         }
 
@@ -156,9 +156,9 @@ public class SystemTablesCreationOnConnectionIT {
         @Override // public for testing
         public synchronized ConnectionQueryServices getConnectionQueryServices(String url,
                                                                                Properties info) throws SQLException {
+            QueryServicesTestImpl qsti = new QueryServicesTestImpl(getDefaultProps(), overrideProps);
             if (cqs == null) {
-                cqs = new PhoenixSysCatCreationServices(new QueryServicesTestImpl(getDefaultProps(),
-                        overrideProps), ConnectionInfo.create(url), info);
+                cqs = new PhoenixSysCatCreationServices(qsti, ConnectionInfo.create(url, qsti.getProps(), info), info);
                 cqs.init(url, info);
             }
             return cqs;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesUpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesUpgradeIT.java
index 9c4d03b536..647e3aae14 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesUpgradeIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesUpgradeIT.java
@@ -30,9 +30,9 @@ import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.coprocessor.TaskMetaDataEndpoint;
+import org.apache.phoenix.jdbc.ConnectionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.ConnectionQueryServices;
@@ -93,7 +93,7 @@ public class SystemTablesUpgradeIT extends BaseTest {
         @Override // public for testing
         public synchronized ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException {
             if (cqs == null) {
-                cqs = new PhoenixUpgradeCountingServices(new QueryServicesTestImpl(getDefaultProps(), overrideProps), ConnectionInfo.create(url), info);
+                cqs = new PhoenixUpgradeCountingServices(new QueryServicesTestImpl(getDefaultProps(), overrideProps), ConnectionInfo.create(url, null, null), info);
                 cqs.init(url, info);
             } else if (reinitialize) {
                 cqs.init(url, info);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 4b9cc9f9a8..624c4ab0bc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -86,7 +86,8 @@ import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 /**
@@ -100,6 +101,9 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 @Category(NeedsOwnMiniClusterTest.class)
 @RunWith(Parameterized.class)
 public class MutableIndexFailureIT extends BaseTest {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(MutableIndexFailureIT.class);
+
     public static volatile boolean FAIL_WRITE = false;
     public static volatile String fullTableName;
     
@@ -473,6 +477,7 @@ public class MutableIndexFailureIT extends BaseTest {
                         }
                         conn.commit();
                     } catch (SQLException e) {
+                        LOGGER.warn("Error while adding row", e);
                         return false;
                     }
                     return true;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java
index 4ccc3d4cae..252e243357 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end.transform;
 
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -28,11 +29,13 @@ import org.apache.phoenix.end2end.index.SingleCellIndexIT;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConfigurationFactory;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.transform.SystemTransformRecord;
 import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.util.InstanceResolver;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
@@ -77,6 +80,22 @@ public class TransformMonitorExtendedIT extends BaseTest {
 
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
                 new ReadOnlyProps(clientProps.entrySet().iterator()));
+
+        InstanceResolver.clearSingletons();
+        // Make sure the ConnectionInfo in the tool doesn't try to pull a default Configuration
+        InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() {
+            @Override
+            public Configuration getConfiguration() {
+                return new Configuration(config);
+            }
+
+            @Override
+            public Configuration getConfiguration(Configuration confToClone) {
+                Configuration copy = new Configuration(config);
+                copy.addResource(confToClone);
+                return copy;
+            }
+        });
     }
 
     public TransformMonitorExtendedIT() throws IOException, InterruptedException {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java
index 774b8947d8..18b6a14be6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java
@@ -204,7 +204,7 @@ public class FailoverPhoenixConnectionIT {
 
             try (Connection conn = createFailoverConnection()) {
                 FailoverPhoenixConnection failoverConn = (FailoverPhoenixConnection) conn;
-                assertEquals(CLUSTERS.getUrl2(), failoverConn.getWrappedConnection().getURL());
+                assertEquals(CLUSTERS.getJdbcUrl2(), failoverConn.getWrappedConnection().getURL());
                 doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
             }
         });
@@ -237,7 +237,7 @@ public class FailoverPhoenixConnectionIT {
 
             try (Connection conn = createFailoverConnection()) {
                 FailoverPhoenixConnection failoverConn = (FailoverPhoenixConnection) conn;
-                assertEquals(CLUSTERS.getUrl2(), failoverConn.getWrappedConnection().getURL());
+                assertEquals(CLUSTERS.getJdbcUrl2(), failoverConn.getWrappedConnection().getURL());
                 doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
             }
         });
@@ -286,7 +286,7 @@ public class FailoverPhoenixConnectionIT {
         LOG.info("Testing failover connection when both clusters are up and running");
         try (Connection conn = createFailoverConnection()) {
             FailoverPhoenixConnection failoverConn = conn.unwrap(FailoverPhoenixConnection.class);
-            assertEquals(CLUSTERS.getUrl2(), failoverConn.getWrappedConnection().getURL());
+            assertEquals(CLUSTERS.getJdbcUrl2(), failoverConn.getWrappedConnection().getURL());
             doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
         }
 
@@ -398,7 +398,7 @@ public class FailoverPhoenixConnectionIT {
         // The wrapped connection is still against the first cluster, and is closed
         PhoenixConnection pc = ((FailoverPhoenixConnection)conn).getWrappedConnection();
         assertNotNull(pc);
-        assertEquals(CLUSTERS.getUrl1(), pc.getURL());
+        assertEquals(CLUSTERS.getJdbcUrl1(), pc.getURL());
         assertTrue(pc.isClosed());
         doTestActionShouldFailBecauseOfFailover(conn::createStatement);
     }
@@ -485,7 +485,7 @@ public class FailoverPhoenixConnectionIT {
     public void testFailoverCanFinishWhenOneConnectionGotStuckClosing() throws Exception {
         Connection conn = createFailoverConnection();
         doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
-        assertEquals(CLUSTERS.getUrl1(),  // active connection is against the first cluster
+        assertEquals(CLUSTERS.getJdbcUrl1(),  // active connection is against the first cluster
                 conn.unwrap(FailoverPhoenixConnection.class).getWrappedConnection().getURL());
 
         // Spy the wrapped connection
@@ -518,7 +518,7 @@ public class FailoverPhoenixConnectionIT {
 
         try (Connection conn2 = createFailoverConnection()) {
             doTestBasicOperationsWithConnection(conn2, tableName, haGroupName);
-            assertEquals(CLUSTERS.getUrl2(), // active connection is against the second cluster
+            assertEquals(CLUSTERS.getJdbcUrl2(), // active connection is against the second cluster
                     conn2.unwrap(FailoverPhoenixConnection.class).getWrappedConnection().getURL());
         }
 
@@ -682,7 +682,7 @@ public class FailoverPhoenixConnectionIT {
     public void testFailoverTwice() throws Exception {
         try (Connection conn = createFailoverConnection()) {
             doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
-            assertEquals(CLUSTERS.getUrl1(), // active connection is against the first cluster
+            assertEquals(CLUSTERS.getJdbcUrl1(), // active connection is against the first cluster
                     conn.unwrap(FailoverPhoenixConnection.class).getWrappedConnection().getURL());
         }
 
@@ -691,7 +691,7 @@ public class FailoverPhoenixConnectionIT {
 
         try (Connection conn = createFailoverConnection()) {
             doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
-            assertEquals(CLUSTERS.getUrl2(), // active connection is against the second cluster
+            assertEquals(CLUSTERS.getJdbcUrl2(), // active connection is against the second cluster
                     conn.unwrap(FailoverPhoenixConnection.class).getWrappedConnection().getURL());
         }
 
@@ -700,7 +700,7 @@ public class FailoverPhoenixConnectionIT {
 
         try (Connection conn = createFailoverConnection()) {
             doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
-            assertEquals(CLUSTERS.getUrl1(), // active connection is against the first cluster
+            assertEquals(CLUSTERS.getJdbcUrl1(), // active connection is against the first cluster
                     conn.unwrap(FailoverPhoenixConnection.class).getWrappedConnection().getURL());
         }
     }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java
index 4435236625..f7a99ba92a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java
@@ -327,7 +327,7 @@ public class HighAvailabilityGroupIT {
      */
     @Test
     public void testConnectToOneCluster() throws SQLException {
-        final String url = CLUSTERS.getUrl1();
+        final String url = CLUSTERS.getJdbcUrl1();
         PhoenixConnection connection = haGroup.connectToOneCluster(url, clientProperties);
         assertEquals(url, connection.getURL());
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTestIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTestIT.java
index 16f9e9af6a..cc9b703005 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTestIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTestIT.java
@@ -80,8 +80,8 @@ public class HighAvailabilityGroupTestIT {
     // This test cannot be run in parallel since it registers/deregisters driver
 
     private static final Logger LOG = LoggerFactory.getLogger(HighAvailabilityGroupTestIT.class);
-    private static final String ZK1 = "zk1-1,zk1-2:2181:/hbase";
-    private static final String ZK2 = "zk2-1,zk2-2:2181:/hbase";
+    private static final String ZK1 = "zk1-1\\:2181,zk1-2\\:2181::/hbase";
+    private static final String ZK2 = "zk2-1\\:2181,zk2-2\\:2181::/hbase";
     private static final PhoenixEmbeddedDriver DRIVER = mock(PhoenixEmbeddedDriver.class);
 
     /** The client properties to create a JDBC connection. */
@@ -232,9 +232,9 @@ public class HighAvailabilityGroupTestIT {
     public void testConnectToOneClusterShouldNotFailWithDifferentHostOrderJdbcString() throws SQLException {
         // test with JDBC string
         final String hosts = "zk1-2,zk1-1:2181:/hbase";
-        final String jdbcString = String.format("jdbc:phoenix:%s", hosts);
+        final String jdbcString = String.format("jdbc:phoenix+zk:%s", hosts);
         haGroup.connectToOneCluster(jdbcString, clientProperties);
-        verify(DRIVER, times(1)).getConnectionQueryServices(eq(String.format("jdbc:phoenix:%s",ZK1)), eq(clientProperties));
+        verify(DRIVER, times(1)).getConnectionQueryServices(eq(String.format("jdbc:phoenix+zk:%s",ZK1)), eq(clientProperties));
     }
 
     /**
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
index d604774e50..1cb6c0ba79 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
@@ -78,9 +78,9 @@ public class HighAvailabilityTestingUtility {
     public static class HBaseTestingUtilityPair implements Closeable {
         private final HBaseTestingUtility hbaseCluster1 = new HBaseTestingUtility();
         private final HBaseTestingUtility hbaseCluster2 = new HBaseTestingUtility();
-        /** The host:port:/hbase format of the JDBC string for HBase cluster 1. */
+        /** The host\:port::/hbase format of the JDBC string for HBase cluster 1. */
         private String url1;
-        /** The host:port:/hbase format of the JDBC string for HBase cluster 2. */
+        /** The host\:port::/hbase format of the JDBC string for HBase cluster 2. */
         private String url2;
         private PhoenixHAAdminHelper haAdmin1;
         private PhoenixHAAdminHelper haAdmin2;
@@ -114,8 +114,8 @@ public class HighAvailabilityTestingUtility {
             String confAddress1 = hbaseCluster1.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM);
             String confAddress2 = hbaseCluster2.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM);
 
-            url1 = String.format("%s:%d:/hbase", confAddress1, hbaseCluster1.getZkCluster().getClientPort());
-            url2 = String.format("%s:%d:/hbase", confAddress2, hbaseCluster2.getZkCluster().getClientPort());
+            url1 = String.format("%s\\:%d::/hbase", confAddress1, hbaseCluster1.getZkCluster().getClientPort());
+            url2 = String.format("%s\\:%d::/hbase", confAddress2, hbaseCluster2.getZkCluster().getClientPort());
 
             haAdmin1 = new PhoenixHAAdminHelper(getUrl1(), hbaseCluster1.getConfiguration(), PhoenixHAAdminTool.HighAvailibilityCuratorProvider.INSTANCE);
             haAdmin2 = new PhoenixHAAdminHelper(getUrl2(), hbaseCluster2.getConfiguration(), PhoenixHAAdminTool.HighAvailibilityCuratorProvider.INSTANCE);
@@ -354,7 +354,7 @@ public class HighAvailabilityTestingUtility {
         }
 
         public String getJdbcUrl(String zkUrl) {
-            return String.format("jdbc:phoenix:%s:%s", zkUrl, PRINCIPAL);
+            return String.format("jdbc:phoenix+zk:%s:%s", zkUrl, PRINCIPAL);
         }
 
         public String getUrl1() {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
index 14127f111e..7ecacd2261 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
@@ -151,12 +151,14 @@ public class ParallelPhoenixConnectionIT {
             Assert.assertEquals(HBaseTestingUtilityPair.PRINCIPAL, cqsi.getUserName());
             PhoenixConnection pConn = pr.getFutureConnection1().get();
             ConnectionQueryServices cqsiFromConn = pConn.getQueryServices();
+            Assert.assertEquals(HBaseTestingUtilityPair.PRINCIPAL, cqsiFromConn.getUserName());
             Assert.assertTrue(cqsi == cqsiFromConn);
             // verify connection#2
             cqsi = PhoenixDriver.INSTANCE.getConnectionQueryServices(group.getJDBCUrl2(), clientProperties);
             Assert.assertEquals(HBaseTestingUtilityPair.PRINCIPAL, cqsi.getUserName());
             pConn = pr.getFutureConnection2().get();
             cqsiFromConn = pConn.getQueryServices();
+            Assert.assertEquals(HBaseTestingUtilityPair.PRINCIPAL, cqsiFromConn.getUserName());
             Assert.assertTrue(cqsi == cqsiFromConn);
         }
     }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/SecureUserConnectionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/SecureUserConnectionsIT.java
index 630590f50c..47d7c0424b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/SecureUserConnectionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/SecureUserConnectionsIT.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.query.ConfigurationFactory;
 import org.apache.phoenix.util.InstanceResolver;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -235,7 +234,7 @@ public class SecureUserConnectionsIT {
         PrivilegedExceptionAction<Void> callable = new PrivilegedExceptionAction<Void>() {
             public Void run() throws Exception {
                 String url = joinUserAuthentication(BASE_URL, princ1, keytab1);
-                connections.add(ConnectionInfo.create(url).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+                connections.add(ConnectionInfo.create(url,ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
                 return null;
             }
         };
@@ -261,7 +260,7 @@ public class SecureUserConnectionsIT {
         PrivilegedExceptionAction<Void> callable = new PrivilegedExceptionAction<Void>() {
             public Void run() throws Exception {
                 String url = joinUserAuthentication(BASE_URL, princ1, keytab1);
-                connections.add(ConnectionInfo.create(url).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+                connections.add(ConnectionInfo.create(url, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
                 return null;
             }
         };
@@ -293,7 +292,7 @@ public class SecureUserConnectionsIT {
         ugi1.doAs(new PrivilegedExceptionAction<Void>() {
             public Void run() throws Exception {
                 String url = joinUserAuthentication(BASE_URL, princ1, keytab1);
-                connections.add(ConnectionInfo.create(url).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+                connections.add(ConnectionInfo.create(url, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
                 return null;
             }
         });
@@ -304,7 +303,7 @@ public class SecureUserConnectionsIT {
         ugi2.doAs(new PrivilegedExceptionAction<Void>() {
             public Void run() throws Exception {
                 String url = joinUserAuthentication(BASE_URL, princ2, keytab2);
-                connections.add(ConnectionInfo.create(url).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+                connections.add(ConnectionInfo.create(url, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
                 return null;
             }
         });
@@ -314,7 +313,7 @@ public class SecureUserConnectionsIT {
         ugi1.doAs(new PrivilegedExceptionAction<Void>() {
             public Void run() throws Exception {
                 String url = joinUserAuthentication(BASE_URL, princ1, keytab1);
-                connections.add(ConnectionInfo.create(url).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+                connections.add(ConnectionInfo.create(url, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
                 return null;
             }
         });
@@ -334,19 +333,19 @@ public class SecureUserConnectionsIT {
 
         UserGroupInformation.loginUserFromKeytab(princ1, keytab1.getPath());
         // Using the same UGI should result in two equivalent ConnectionInfo objects
-        connections.add(ConnectionInfo.create(url1).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+        connections.add(ConnectionInfo.create(url1, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
         assertEquals(1, connections.size());
         // Sanity check
         verifyAllConnectionsAreKerberosBased(connections);
 
         UserGroupInformation.loginUserFromKeytab(princ2, keytab2.getPath());
-        connections.add(ConnectionInfo.create(url2).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+        connections.add(ConnectionInfo.create(url2, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
         assertEquals(2, connections.size());
         verifyAllConnectionsAreKerberosBased(connections);
 
         // Because the UGI instances are unique, so are the connections
         UserGroupInformation.loginUserFromKeytab(princ1, keytab1.getPath());
-        connections.add(ConnectionInfo.create(url1).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+        connections.add(ConnectionInfo.create(url1, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
         assertEquals(3, connections.size());
         verifyAllConnectionsAreKerberosBased(connections);
     }
@@ -360,13 +359,13 @@ public class SecureUserConnectionsIT {
 
         UserGroupInformation.loginUserFromKeytab(princ1, keytab1.getPath());
         // Using the same UGI should result in two equivalent ConnectionInfo objects
-        connections.add(ConnectionInfo.create(url).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+        connections.add(ConnectionInfo.create(url, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
         assertEquals(1, connections.size());
         // Sanity check
         verifyAllConnectionsAreKerberosBased(connections);
 
         // Because the UGI instances are unique, so are the connections
-        connections.add(ConnectionInfo.create(url).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+        connections.add(ConnectionInfo.create(url, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
         assertEquals(1, connections.size());
     }
 
@@ -378,13 +377,13 @@ public class SecureUserConnectionsIT {
 
         // Using the same UGI should result in two equivalent ConnectionInfo objects
         final String url = joinUserAuthentication(BASE_URL, princ1, keytab1);
-        connections.add(ConnectionInfo.create(url).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+        connections.add(ConnectionInfo.create(url, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
         assertEquals(1, connections.size());
         // Sanity check
         verifyAllConnectionsAreKerberosBased(connections);
 
         // Because the UGI instances are unique, so are the connections
-        connections.add(ConnectionInfo.create(url).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+        connections.add(ConnectionInfo.create(url, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
         assertEquals(1, connections.size());
     }
 
@@ -399,18 +398,18 @@ public class SecureUserConnectionsIT {
         final String url2 = joinUserAuthentication(BASE_URL, princ2, keytab2);
 
         // Using the same UGI should result in two equivalent ConnectionInfo objects
-        connections.add(ConnectionInfo.create(url1).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+        connections.add(ConnectionInfo.create(url1, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
         assertEquals(1, connections.size());
         // Sanity check
         verifyAllConnectionsAreKerberosBased(connections);
 
         // Because the UGI instances are unique, so are the connections
-        connections.add(ConnectionInfo.create(url2).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+        connections.add(ConnectionInfo.create(url2, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
         assertEquals(2, connections.size());
         verifyAllConnectionsAreKerberosBased(connections);
 
         // Using the same UGI should result in two equivalent ConnectionInfo objects
-        connections.add(ConnectionInfo.create(url1).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+        connections.add(ConnectionInfo.create(url1, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
         assertEquals(3, connections.size());
         // Sanity check
         verifyAllConnectionsAreKerberosBased(connections);
@@ -427,29 +426,29 @@ public class SecureUserConnectionsIT {
         final String url2 = joinUserAuthentication(BASE_URL, princ2, keytab2);
 
         // Using the same UGI should result in two equivalent ConnectionInfo objects
-        connections.add(ConnectionInfo.create(url1).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+        connections.add(ConnectionInfo.create(url1, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
         assertEquals(1, connections.size());
         // Sanity check
         verifyAllConnectionsAreKerberosBased(connections);
 
         // Logging in as the same user again should not duplicate connections
-        connections.add(ConnectionInfo.create(url1).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+        connections.add(ConnectionInfo.create(url1, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
         assertEquals(1, connections.size());
         // Sanity check
         verifyAllConnectionsAreKerberosBased(connections);
 
         // Add a second one.
-        connections.add(ConnectionInfo.create(url2).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+        connections.add(ConnectionInfo.create(url2, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
         assertEquals(2, connections.size());
         verifyAllConnectionsAreKerberosBased(connections);
 
         // Again, verify this user is not duplicated
-        connections.add(ConnectionInfo.create(url2).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+        connections.add(ConnectionInfo.create(url2, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
         assertEquals(2, connections.size());
         verifyAllConnectionsAreKerberosBased(connections);
 
         // Because the UGI instances are unique, so are the connections
-        connections.add(ConnectionInfo.create(url1).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+        connections.add(ConnectionInfo.create(url1, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
         assertEquals(3, connections.size());
         verifyAllConnectionsAreKerberosBased(connections);
     }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
index f936796d34..441edcee7d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
@@ -29,8 +29,8 @@ import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.exception.PhoenixIOException;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.CommitException;
+import org.apache.phoenix.jdbc.ConnectionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
 import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.ConfigurationFactory;
@@ -1478,7 +1478,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
     private static class PhoenixMetricsTestingQueryServices extends ConnectionQueryServicesImpl {
 
         PhoenixMetricsTestingQueryServices(QueryServices services,
-                PhoenixEmbeddedDriver.ConnectionInfo connectionInfo, Properties info) {
+                ConnectionInfo connectionInfo, Properties info) {
             super(services, connectionInfo, info);
         }
 
@@ -1516,10 +1516,12 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
         @Override public synchronized ConnectionQueryServices getConnectionQueryServices(String url,
                 Properties info) throws SQLException {
             if (cqs == null) {
+                QueryServicesTestImpl qsti =
+                        new QueryServicesTestImpl(getDefaultProps(), overrideProps);
                 cqs =
                         new PhoenixMetricsTestingQueryServices(
-                                new QueryServicesTestImpl(getDefaultProps(), overrideProps),
-                                ConnectionInfo.create(url), info);
+                            qsti,
+                                ConnectionInfo.create(url, qsti.getProps(), info), info);
                 cqs.init(url, info);
             }
             return cqs;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilterIT.java
index 943e21cf4b..cbeaedf682 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilterIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilterIT.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.replication;
 
 import java.io.IOException;
+import java.sql.DriverManager;
 import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
@@ -392,17 +393,14 @@ public class SystemCatalogWALEntryFilterIT extends ParallelStatsDisabledIT {
   private static void dropTenantView() throws Exception {
     Properties tenantProperties = new Properties();
     tenantProperties.setProperty("TenantId", TENANT_ID);
-    try (java.sql.Connection connection =
-        ConnectionUtil.getInputConnection(getUtility().getConfiguration(), tenantProperties)) {
+    try (java.sql.Connection connection = DriverManager.getConnection(getUrl(), tenantProperties)) {
       connection.createStatement().execute(DROP_TENANT_VIEW_SQL);
       connection.commit();
     }
   }
 
   private static void dropNonTenantView() throws Exception {
-    try (java.sql.Connection connection =
-        ConnectionUtil.getInputConnection(getUtility().getConfiguration(), new Properties())) {
-
+    try (java.sql.Connection connection = DriverManager.getConnection(getUrl())) {
       connection.createStatement().execute(DROP_NONTENANT_VIEW_SQL);
     }
   }
@@ -410,16 +408,14 @@ public class SystemCatalogWALEntryFilterIT extends ParallelStatsDisabledIT {
   private static void createTenantView() throws Exception {
     Properties tenantProperties = new Properties();
     tenantProperties.setProperty("TenantId", TENANT_ID);
-    try (java.sql.Connection connection =
-        ConnectionUtil.getInputConnection(getUtility().getConfiguration(), tenantProperties)) {
+    try (java.sql.Connection connection = DriverManager.getConnection(getUrl(), tenantProperties)) {
       connection.createStatement().execute(CREATE_TENANT_VIEW_SQL);
       connection.commit();
     }
   }
 
   private static void createNonTenantView() throws Exception {
-    try (java.sql.Connection connection =
-        ConnectionUtil.getInputConnection(getUtility().getConfiguration(), new Properties())) {
+    try (java.sql.Connection connection = DriverManager.getConnection(getUrl())) {
       connection.createStatement().execute(CREATE_NONTENANT_VIEW_SQL);
       connection.commit();
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/AbstractRPCConnectionInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/AbstractRPCConnectionInfo.java
new file mode 100644
index 0000000000..869f40d2d5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/AbstractRPCConnectionInfo.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hbase.thirdparty.com.google.common.base.Strings;
+import org.apache.phoenix.util.ReadOnlyProps;
+
+/**
+ * Encapsulates the common logic for HRPC based ConnectionInfo classes.
+ *
+ */
+public abstract class AbstractRPCConnectionInfo extends ConnectionInfo {
+
+    private static final String MASTER_ADDRS_KEY = "hbase.masters";
+    private static final String MASTER_HOSTNAME_KEY = "hbase.master.hostname";
+
+    protected String bootstrapServers;
+
+    public String getBoostrapServers() {
+        return bootstrapServers;
+    }
+
+    protected AbstractRPCConnectionInfo(boolean isConnectionless, String principal, String keytab,
+            User user, String haGroup) {
+        super(isConnectionless, principal, keytab, user, haGroup);
+    }
+
+    @Override
+    public String getZookeeperConnectionString() {
+        throw new UnsupportedOperationException("MasterRegistry is used");
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((bootstrapServers == null) ? 0 : bootstrapServers.hashCode());
+        // Port is already provided in or normalized into bootstrapServers
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!super.equals(obj)) {
+            return false;
+        }
+        AbstractRPCConnectionInfo other = (AbstractRPCConnectionInfo) obj;
+        if (bootstrapServers == null) {
+            if (other.bootstrapServers != null) {
+                return false;
+            }
+        } else if (!bootstrapServers.equals(other.bootstrapServers)) {
+            return false;
+        }
+        // Port is already provided in or normalized into bootstrapServers
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(bootstrapServers.replaceAll(":", "\\\\:"));
+        if (anyNotNull(principal, keytab)) {
+            sb.append(principal == null ? ":::" : ":::" + principal);
+        }
+        if (anyNotNull(keytab)) {
+            sb.append(keytab == null ? ":" : ":" + keytab);
+        }
+        return sb.toString();
+    }
+
+    /**
+     * Abstract Builder parent for HRPC based ConnectionInfo classes.
+     *
+     * @since 138
+     */
+    protected abstract static class Builder extends ConnectionInfo.Builder {
+        String hostsList;
+        String portString;
+        Integer port;
+
+        public Builder(String url, Configuration config, ReadOnlyProps props, Properties info) {
+            super(url, config, props, info);
+        }
+
+        @Override
+        protected ConnectionInfo create() throws SQLException {
+            parse();
+            normalize();
+            handleKerberosAndLogin();
+            setHaGroup();
+            return build();
+        }
+
+        private void parse() throws SQLException {
+            StringTokenizer tokenizer = getTokenizerWithoutProtocol();
+
+            // Unlike for the ZK URL, there is no heuristics to figure out missing parts.
+            // Unspecified parts inside the URL must be indicated by ::.
+            boolean wasSeparator = true;
+            boolean first = true;
+            ArrayList<String> parts = new ArrayList<>(7);
+            String token = null;
+            while (tokenizer.hasMoreTokens()
+                    && !(token = tokenizer.nextToken()).equals(TERMINATOR)) {
+                if (DELIMITERS.contains(token)) {
+                    if (wasSeparator && !first) {
+                        parts.add(null);
+                    }
+                    wasSeparator = true;
+                } else {
+                    parts.add(token);
+                    wasSeparator = false;
+                }
+                first = false;
+                if (parts.size() > 6) {
+                    throw getMalFormedUrlException(url);
+                }
+            }
+
+            if (parts.size() == 6) {
+                // We could check for FileSystems.getDefault().getSeparator()), but then
+                // we wouldn't be able to test on Unix.
+                if (parts.get(5).startsWith("\\")) {
+                    // Reconstruct windows path
+                    parts.set(4, parts.get(4) + ":" + parts.get(5));
+                    parts.remove(5);
+                } else {
+                    throw getMalFormedUrlException(url);
+                }
+            }
+
+            while (parts.size() < 7) {
+                parts.add(null);
+            }
+            hostsList = parts.get(0);
+            portString = parts.get(1);
+            if (portString != null) {
+                try {
+                    port = Integer.parseInt(parts.get(1));
+                    if (port < 0) {
+                        throw new Exception();
+                    }
+                } catch (Exception e) {
+                    throw getMalFormedUrlException(url);
+                }
+            }
+            if (parts.get(2) != null && !parts.get(2).isEmpty()) {
+                // This MUST be empty
+                throw getMalFormedUrlException(url);
+            }
+            principal = parts.get(3);
+            keytab = parts.get(4);
+        }
+
+        protected void normalizeMaster() throws SQLException {
+            if (hostsList != null && hostsList.isEmpty()) {
+                hostsList = null;
+            }
+            if (portString != null && portString.isEmpty()) {
+                portString = null;
+            }
+            if (portString != null) {
+                try {
+                    port = Integer.parseInt(portString);
+                    if (port < 0) {
+                        throw new Exception();
+                    }
+                } catch (Exception e) {
+                    throw getMalFormedUrlException(url);
+                }
+            }
+
+            if (port == null) {
+                port = getDefaultMasterPort();
+            }
+            // At this point, masterPort is guaranteed not to be 0
+
+            if (isConnectionless) {
+                // We probably don't create connectionless MasterConnectionInfo objects
+                if (hostsList != null || port != null) {
+                    throw getMalFormedUrlException(url);
+                } else {
+                    return;
+                }
+            }
+
+            if (hostsList == null) {
+                hostsList = getMasterAddr(port);
+                if (hostsList == null) {
+                    throw getMalFormedUrlException(
+                        "Hbase masters are not specified and in URL, and are not set in the configuration files: "
+                                + url);
+                }
+            } else {
+                hostsList = hostsList.replaceAll("=", ":");
+            }
+
+            hostsList = normalizeHostsList(hostsList, port);
+        }
+
+        /**
+         * Copied from org.apache.hadoop.hbase.client.MasterRegistry (which is private) Supplies the
+         * default master port we should use given the provided configuration.
+         * @param conf Configuration to parse from.
+         */
+        private  int getDefaultMasterPort() {
+            String portString = get(HConstants.MASTER_PORT);
+            if (portString == null) {
+                port = HConstants.DEFAULT_MASTER_PORT;
+            } else {
+                port = Integer.parseInt(portString);
+            }
+            if (port == 0) {
+                // Master port may be set to 0. We should substitute the default port in that case.
+                return HConstants.DEFAULT_MASTER_PORT;
+            }
+            return port;
+        }
+
+        /**
+         * Adopted from org.apache.hadoop.hbase.client.MasterRegistry Builds the default master
+         * address end point if it is not specified in the configuration.
+         */
+        private String getMasterAddr(int port) {
+            String masterAddrFromConf = get(MASTER_ADDRS_KEY);
+            if (!Strings.isNullOrEmpty(masterAddrFromConf)) {
+                return masterAddrFromConf;
+            }
+            String hostname = get(MASTER_HOSTNAME_KEY);
+            if (hostname != null) {
+                return String.format("%s:%d", hostname, port);
+            } else {
+                return null;
+            }
+        }
+
+        protected abstract ConnectionInfo build();
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
index 348cae0282..aa28c6e85f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
@@ -93,6 +93,7 @@ public class ClusterRoleRecord {
             @JsonProperty("version") long version) {
         this.haGroupName = haGroupName;
         this.policy = policy;
+        //Do we really need to normalize here ?
         zk1 = JDBCUtil.formatZookeeperUrl(zk1);
         zk2 = JDBCUtil.formatZookeeperUrl(zk2);
         // Ignore the given order of url1 and url2
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ConnectionInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ConnectionInfo.java
new file mode 100644
index 0000000000..b982b1d6a1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ConnectionInfo.java
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to encapsulate connection info for HBase
+ * @since 0.1.1
+ */
+public abstract class ConnectionInfo {
+    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(ConnectionInfo.class);
+    protected static final Object KERBEROS_LOGIN_LOCK = new Object();
+    protected static final char WINDOWS_SEPARATOR_CHAR = '\\';
+    protected static final String REALM_EQUIVALENCY_WARNING_MSG =
+            "Provided principal does not contain a realm and the default realm cannot be"
+            + " determined. Ignoring realm equivalency check.";
+    protected static final String TERMINATOR = "" + PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+    protected static final String DELIMITERS = TERMINATOR + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+    protected static final String CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY =
+            "hbase.client.registry.impl";
+
+    protected static final boolean HAS_MASTER_REGISTRY;
+    protected static final boolean HAS_RPC_REGISTRY;
+
+    static {
+        String version = VersionInfo.getVersion();
+        if (VersionInfo.getMajorVersion(version) >= 3) {
+            HAS_MASTER_REGISTRY = true;
+            HAS_RPC_REGISTRY = true;
+        } else {
+            if (VersionInfo.compareVersion(VersionInfo.getVersion(), "2.3.0") < 0) {
+                HAS_MASTER_REGISTRY = false;
+                HAS_RPC_REGISTRY = false;
+            } else if (VersionInfo.compareVersion(VersionInfo.getVersion(), "2.5.0") < 0) {
+                HAS_MASTER_REGISTRY = true;
+                HAS_RPC_REGISTRY = false;
+            } else {
+                HAS_MASTER_REGISTRY = true;
+                HAS_RPC_REGISTRY = true;
+            }
+        }
+    }
+
+    protected static SQLException getMalFormedUrlException(String url) {
+        return new SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL)
+                .setMessage(url).build().buildException();
+    }
+
+    protected final boolean isConnectionless;
+    protected final String principal;
+    protected final String keytab;
+    protected final User user;
+    protected final String haGroup;
+
+    protected ConnectionInfo(boolean isConnectionless, String principal, String keytab, User user,
+            String haGroup) {
+        super();
+        this.isConnectionless = isConnectionless;
+        this.principal = principal;
+        this.keytab = keytab;
+        this.user = user;
+        this.haGroup = haGroup;
+    }
+
+    protected static String unescape(String escaped) {
+        return escaped.replaceAll("\\\\:", "=");
+    }
+
+    public static ConnectionInfo createNoLogin(String url, ReadOnlyProps props, Properties info)
+            throws SQLException {
+        Configuration conf = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        return create(url, conf, props, info, true);
+    }
+
+    public static ConnectionInfo create(String url, ReadOnlyProps props, Properties info)
+            throws SQLException {
+        Configuration conf = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        return create(url, conf, props, info);
+    }
+
+    public static ConnectionInfo createNoLogin(String url, Configuration configuration,
+            ReadOnlyProps props, Properties info) throws SQLException {
+        return create(url, configuration, props, info, true);
+    }
+
+    public static ConnectionInfo create(String url, Configuration configuration,
+            ReadOnlyProps props, Properties info) throws SQLException {
+        return create(url, configuration, props, info, false);
+    }
+
+    public static ConnectionInfo create(String url, Configuration configuration,
+            ReadOnlyProps props, Properties info, boolean doNotLogin) throws SQLException {
+        // registry-independent URL preprocessing
+        url = url == null ? "" : url;
+        url = unescape(url);
+
+        // Assume missing prefix
+        if (url.isEmpty()) {
+            url = PhoenixRuntime.JDBC_PROTOCOL;
+        }
+        if (!url.startsWith(PhoenixRuntime.JDBC_PROTOCOL)) {
+            url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + url;
+        }
+
+        if (configuration == null) {
+            configuration = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        }
+
+        Builder builder;
+
+        if (url.toLowerCase().startsWith(PhoenixRuntime.JDBC_PROTOCOL_ZK)) {
+            builder = new ZKConnectionInfo.Builder(url, configuration, props, info);
+        } else if (url.toLowerCase().startsWith(PhoenixRuntime.JDBC_PROTOCOL_MASTER)) {
+            builder = new MasterConnectionInfo.Builder(url, configuration, props, info);
+        } else if (url.toLowerCase().startsWith(PhoenixRuntime.JDBC_PROTOCOL_RPC)) {
+            builder = new RPCConnectionInfo.Builder(url, configuration, props, info);
+        } else if (url.toLowerCase().startsWith(PhoenixRuntime.JDBC_PROTOCOL)) {
+            // The generic protocol was specified. Try to Determine the protocol from the config
+            if (MasterConnectionInfo.isMaster(configuration)) {
+                builder = new MasterConnectionInfo.Builder(url, configuration, props, info);
+            } else if (RPCConnectionInfo.isRPC(configuration)) {
+                builder = new RPCConnectionInfo.Builder(url, configuration, props, info);
+            } else if (ZKConnectionInfo.isZK(configuration)) {
+                builder = new ZKConnectionInfo.Builder(url, configuration, props, info);
+            } else {
+                // No registry class set in config. Use version-dependent default
+                if (VersionInfo.getMajorVersion(VersionInfo.getVersion()) >= 3) {
+                    builder = new RPCConnectionInfo.Builder(url, configuration, props, info);
+                } else {
+                    builder = new ZKConnectionInfo.Builder(url, configuration, props, info);
+                }
+            }
+        } else {
+            throw getMalFormedUrlException(url);
+        }
+
+        builder.setDoNotLogin(doNotLogin);
+        return builder.create();
+    }
+
+    protected static List<String> handleWindowsKeytab(String url, List<String> parts)
+            throws SQLException {
+
+        if (parts.size() == 7) {
+            // We could check for FileSystems.getDefault().getSeparator()), but then
+            // we wouldn't be able to test on Unix.
+            if (parts.get(6) != null && parts.get(6).startsWith("\\")) {
+                // Reconstruct windows path
+                parts.set(5, parts.get(5) + ":" + parts.get(6));
+                parts.remove(6);
+            } else {
+                throw getMalFormedUrlException(url);
+            }
+        }
+
+        return parts;
+    }
+
+    // Visible for testing
+    static boolean isSameName(String currentName, String newName) throws IOException {
+        return isSameName(currentName, newName, null, getDefaultKerberosRealm());
+    }
+
+    /**
+     * Computes the default kerberos realm if one is available. If one cannot be computed, null is
+     * returned.
+     * @return The default kerberos realm, or null.
+     */
+    static String getDefaultKerberosRealm() {
+        try {
+            return KerberosUtil.getDefaultRealm();
+        } catch (Exception e) {
+            if (LOGGER.isDebugEnabled()) {
+                // Include the stacktrace at DEBUG
+                LOGGER.debug(REALM_EQUIVALENCY_WARNING_MSG, e);
+            } else {
+                // Limit the content at WARN
+                LOGGER.warn(REALM_EQUIVALENCY_WARNING_MSG);
+            }
+        }
+        return null;
+    }
+
+    static boolean isSameName(String currentName, String newName, String hostname)
+            throws IOException {
+        return isSameName(currentName, newName, hostname, getDefaultKerberosRealm());
+    }
+
+    static boolean isSameName(String currentName, String newName, String hostname,
+            String defaultRealm) throws IOException {
+        final boolean newNameContainsRealm = newName.indexOf('@') != -1;
+        // Make sure to replace "_HOST" if it exists before comparing the principals.
+        if (newName.contains(org.apache.hadoop.security.SecurityUtil.HOSTNAME_PATTERN)) {
+            if (newNameContainsRealm) {
+                newName =
+                        org.apache.hadoop.security.SecurityUtil.getServerPrincipal(newName,
+                            hostname);
+            } else {
+                // If the principal ends with "/_HOST", replace "_HOST" with the hostname.
+                if (newName.endsWith("/_HOST")) {
+                    newName = newName.substring(0, newName.length() - 5) + hostname;
+                }
+            }
+        }
+        // The new name doesn't contain a realm and we could compute a default realm
+        if (!newNameContainsRealm && defaultRealm != null) {
+            return currentName.equals(newName + "@" + defaultRealm);
+        }
+        // We expect both names to contain a realm, so we can do a simple equality check
+        return currentName.equals(newName);
+    }
+
+    /**
+     * Create a new Configuration object that merges the CQS properties and the Connection
+     * properties into the HBase configuration object
+     * @param props CQS properties
+     * @param info JDBC connection properties
+     * @return merged configuration
+     */
+    protected static Configuration mergeConfiguration(Configuration configIn, ReadOnlyProps props,
+            Properties info) {
+        // TODO is cloning the configuration a performance problem ?
+        Configuration config;
+        if (configIn != null) {
+            config = new Configuration(configIn);
+        } else {
+            // props/info contains everything
+            config = new Configuration(false);
+        }
+        // Add QueryServices properties
+        if (props != null) {
+            for (Entry<String, String> entry : props) {
+                config.set(entry.getKey(), entry.getValue());
+            }
+        }
+        // Add any user-provided properties (via DriverManager)
+        if (info != null) {
+            for (Object key : info.keySet()) {
+                config.set((String) key, info.getProperty((String) key));
+            }
+        }
+        return config;
+    }
+
+    protected Map<String, String> getCommonProps() {
+        Map<String, String> connectionProps = new HashMap<>();
+        if (getPrincipal() != null && getKeytab() != null) {
+            connectionProps.put(QueryServices.HBASE_CLIENT_PRINCIPAL, getPrincipal());
+            connectionProps.put(QueryServices.HBASE_CLIENT_KEYTAB, getKeytab());
+        }
+        return connectionProps;
+    }
+
+    public abstract ReadOnlyProps asProps();
+
+    public boolean isConnectionless() {
+        return isConnectionless;
+    }
+
+    public String getKeytab() {
+        return keytab;
+    }
+
+    public String getPrincipal() {
+        return principal;
+    }
+
+    public User getUser() {
+        return user;
+    }
+
+    public String getHaGroup() {
+        return haGroup;
+    }
+
+    public abstract String toUrl();
+
+    public abstract String getZookeeperConnectionString();
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null) return false;
+        if (getClass() != obj.getClass()) return false;
+        ConnectionInfo other = (ConnectionInfo) obj;
+        // `user` is guaranteed to be non-null
+        if (!other.user.equals(user)) return false;
+        if (principal == null) {
+            if (other.principal != null) return false;
+        } else if (!principal.equals(other.principal)) return false;
+        if (keytab == null) {
+            if (other.keytab != null) return false;
+        } else if (!keytab.equals(other.keytab)) return false;
+        if (haGroup == null) {
+            if (other.haGroup != null) return false;
+        } else if (!haGroup.equals(other.haGroup)) return false;
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((principal == null) ? 0 : principal.hashCode());
+        result = prime * result + ((keytab == null) ? 0 : keytab.hashCode());
+        result = prime * result + ((haGroup == null) ? 0 : haGroup.hashCode());
+        // `user` is guaranteed to be non-null
+        result = prime * result + user.hashCode();
+        return result;
+    }
+
+    protected boolean anyNotNull(Object... params) {
+        for (Object param : params) {
+            if (param != null) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Parent of the Builder classes for the immutable ConnectionInfo classes
+     *
+     * @since
+     */
+    protected abstract static class Builder {
+
+        protected boolean isConnectionless;
+        protected String principal;
+        protected String keytab;
+        protected User user;
+        protected String haGroup;
+        protected boolean doNotLogin = false;
+
+        // Only used for building, not part of ConnectionInfo
+        protected final String url;
+        protected final Configuration config;
+        protected final ReadOnlyProps props;
+        protected final Properties info;
+
+        public Builder(String url, Configuration config, ReadOnlyProps props, Properties info) {
+            this.config = config;
+            this.url = url;
+            this.props = props;
+            this.info = info;
+        }
+
+        protected abstract ConnectionInfo create() throws SQLException;
+
+        protected abstract void normalize() throws SQLException;
+
+        protected String get(String key, String defValue) {
+            String result = null;
+            if (info != null) {
+                result = info.getProperty(key);
+            }
+            if (result == null) {
+                if (props != null) {
+                    result = props.get(key);
+                }
+                if (result == null) {
+                    result = config.get(key, defValue);
+                }
+            }
+            return result;
+        }
+
+        protected String get(String key) {
+            return get(key, null);
+        }
+
+        protected void setHaGroup() {
+            if (info != null) {
+                haGroup = info.getProperty(HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR);
+            }
+        }
+
+        protected void setDoNotLogin(boolean doNotLogin) {
+            this.doNotLogin = doNotLogin;
+        }
+
+        protected void handleKerberosAndLogin() throws SQLException {
+            // Previously we have ignored the kerberos properties defined in hbase-site.xml,
+            // but now we use them
+            try {
+                this.user = User.getCurrent();
+            } catch (IOException e) {
+                throw new RuntimeException("Couldn't get the current user!!", e);
+            }
+            if (null == this.user) {
+                throw new RuntimeException("Acquired null user which should never happen");
+            }
+
+            if (isConnectionless) {
+                return;
+            }
+
+            if (principal == null) {
+                principal = get(QueryServices.HBASE_CLIENT_PRINCIPAL);
+            }
+            if (keytab == null) {
+                keytab = get(QueryServices.HBASE_CLIENT_KEYTAB);
+            }
+            if ((principal == null) && (keytab != null)) {
+                throw getMalFormedUrlException(url);
+            }
+            // We allow specifying a principal without a keytab, in which case
+            // the principal is not used for kerberos, but is set as the connection user
+            if (principal != null && keytab != null && !doNotLogin) {
+                // PHOENIX-3189 Because ConnectionInfo is immutable, we must make sure all parts of
+                // it are correct before
+                // construction; this also requires the Kerberos user credentials object (since they
+                // are compared by reference
+                // and not by value. If the user provided a principal and keytab via the JDBC url,
+                // we must make sure that the
+                // Kerberos login happens *before* we construct the ConnectionInfo object.
+                // Otherwise, the use of ConnectionInfo
+                // to determine when ConnectionQueryServices impl's should be reused will be broken.
+                try {
+                    // Check if we need to authenticate with kerberos so that we cache the correct
+                    // ConnectionInfo
+                    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+                    if (!currentUser.hasKerberosCredentials()
+                            || !isSameName(currentUser.getUserName(), principal)) {
+                        synchronized (KERBEROS_LOGIN_LOCK) {
+                            // Double check the current user, might have changed since we checked
+                            // last. Don't want
+                            // to re-login if it's the same user.
+                            currentUser = UserGroupInformation.getCurrentUser();
+                            if (!currentUser.hasKerberosCredentials()
+                                    || !isSameName(currentUser.getUserName(), principal)) {
+                                LOGGER.info("Trying to connect to a secure cluster as {} "
+                                        + "with keytab {}",
+                                    principal, keytab);
+                                // We are intentionally changing the passed in Configuration object
+                                if (null != principal) {
+                                    config.set(QueryServices.HBASE_CLIENT_PRINCIPAL, principal);
+                                }
+                                if (null != keytab) {
+                                    config.set(QueryServices.HBASE_CLIENT_KEYTAB, keytab);
+                                }
+                                UserGroupInformation.setConfiguration(config);
+                                User.login(config, QueryServices.HBASE_CLIENT_KEYTAB,
+                                    QueryServices.HBASE_CLIENT_PRINCIPAL, null);
+                                user = User.getCurrent();
+                                LOGGER.info("Successful login to secure cluster");
+                            }
+                        }
+                    } else {
+                        // The user already has Kerberos creds, so there isn't anything to change in
+                        // the ConnectionInfo.
+                        LOGGER.debug("Already logged in as {}", currentUser);
+                    }
+                } catch (IOException e) {
+                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
+                            .setRootCause(e).build().buildException();
+                }
+            } else {
+                LOGGER.debug("Principal and keytab not provided, not attempting Kerberos login");
+            }
+        }
+
+        protected String normalizeHostsList(String quorum, Integer defaultPort)
+                throws SQLException {
+            // The input host:port separator char is "=" (after unescaping)
+            String[] quorumParts = quorum.split(",");
+            String[] normalizedParts = new String[quorumParts.length];
+            for (int i = 0; i < quorumParts.length; i++) {
+                String[] hostAndPort = quorumParts[i].trim().split(":");
+                if (hostAndPort.length == 1) {
+                    normalizedParts[i] = hostAndPort[0].trim().toLowerCase() + ":" + defaultPort;
+                } else if (hostAndPort.length == 2) {
+                    normalizedParts[i] = quorumParts[i].trim().toLowerCase();
+                } else {
+                    throw getMalFormedUrlException(url);
+                }
+            }
+            // We are sorting the host:port strings, so the sorting result may be unexpected, but
+            // as long as it's consistent, it doesn't matter.
+            Arrays.sort(normalizedParts);
+            return String.join(",", normalizedParts);
+            // TODO
+            // HBase will perform a further reverse lookup based normalization on the hosts,
+            // but we skip that.
+            // In the unlikely worst case, we generate separate CQSI objects instead of sharing them
+        }
+
+        protected StringTokenizer getTokenizerWithoutProtocol() throws SQLException {
+            StringTokenizer tokenizer = new StringTokenizer(url, DELIMITERS, true);
+            try {
+                // Walk the first three tokens "jdbc", ":", "phoenix"/"phoenix+master"/"phoenix-zk"
+                // This should succeed, as we check for the "jdbc:phoenix" prefix when accepting the
+                // URL
+                if (!tokenizer.nextToken().toLowerCase().equals("jdbc")) {
+                    throw new Exception();
+                }
+                if (!tokenizer.nextToken().toLowerCase().equals(":")) {
+                    throw new Exception();
+                }
+                if (!tokenizer.nextToken().toLowerCase().startsWith("phoenix")) {
+                    throw new Exception();
+                }
+            } catch (Exception e) {
+                throw getMalFormedUrlException(url);
+            }
+            return tokenizer;
+        }
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
index 75ac6cba82..fb2137d106 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
@@ -26,7 +26,6 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.cache.NodeCache;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.ZKPaths;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.util.PairOfSameType;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -357,14 +356,13 @@ public class HighAvailabilityGroup {
             jdbcUrl = jdbcUrl.substring(PhoenixRuntime.JDBC_PROTOCOL.length() + 1);
         }
         Preconditions.checkArgument(!StringUtils.isEmpty(jdbcUrl), "JDBC url is empty!");
-        String[] urls = jdbcUrl.split(":");
-        if (urls.length == 1) {
-            zkUrl = String.format("%s:%s", urls[0], HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
-        } else if (urls.length == 2 || urls.length == 3) {
-            zkUrl = String.format("%s:%s", urls[0], urls[1]);
-        } else {
+        jdbcUrl = jdbcUrl.replaceAll("\\\\:", "=");
+        String[] parts = jdbcUrl.split(":");
+        if (parts.length == 0 || parts.length > 3) {
             throw new IllegalArgumentException("Invalid JDBC url!" + jdbcUrl);
         }
+        // The URL is already normalised
+        zkUrl = parts[0].replaceAll("=", ":");
 
         // Get timeout and retry counts
         String connectionTimeoutMsProp = properties.getProperty(
@@ -568,7 +566,8 @@ public class HighAvailabilityGroup {
         if (state != State.READY || connection == null) {
             return false;
         }
-        return roleRecord.getActiveUrl().equals(Optional.of(connection.getURL()));
+        return roleRecord.getActiveUrl()
+                .equals(Optional.of(JDBCUtil.formatZookeeperUrl(connection.getURL())));
     }
 
     /**
@@ -583,8 +582,8 @@ public class HighAvailabilityGroup {
         if (url.startsWith(PhoenixRuntime.JDBC_PROTOCOL)) {
             Preconditions.checkArgument(url.length() > PhoenixRuntime.JDBC_PROTOCOL.length(),
                     "The URL '" + url + "' is not a valid Phoenix connection string");
-            url = JDBCUtil.formatZookeeperUrl(url.substring(PhoenixRuntime.JDBC_PROTOCOL.length() + 1));
         }
+        url = JDBCUtil.formatZookeeperUrl(url);
         Preconditions.checkArgument(url.equals(info.getUrl1()) || url.equals(info.getUrl2()),
                 "The URL '" + url + "' does not belong to this HA group " + info);
 
@@ -603,7 +602,7 @@ public class HighAvailabilityGroup {
         Preconditions.checkArgument(driver instanceof PhoenixEmbeddedDriver,
                 "No JDBC driver is registered for Phoenix high availability (HA) framework");
         return ((PhoenixEmbeddedDriver) driver).getConnectionQueryServices(jdbcString, properties)
-                .connect(url, properties);
+                .connect(jdbcString, properties);
     }
 
     @VisibleForTesting
@@ -790,7 +789,7 @@ public class HighAvailabilityGroup {
             Preconditions.checkArgument(zkUrl.equals(getUrl1()) || zkUrl.equals(getUrl2()),
                     "The URL '" + zkUrl + "' does not belong to this HA group " + this);
             StringBuilder sb = new StringBuilder();
-            sb.append(PhoenixRuntime.JDBC_PROTOCOL);
+            sb.append(PhoenixRuntime.JDBC_PROTOCOL_ZK);
             sb.append(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR);
             sb.append(zkUrl);
             if (!Strings.isNullOrEmpty(additionalJDBCParams)) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/MasterConnectionInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/MasterConnectionInfo.java
new file mode 100644
index 0000000000..eef75e3402
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/MasterConnectionInfo.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+
+/**
+ * ConnectionInfo used for org.apache.hadoop.hbase.client.MasterRegistry
+ *
+ */
+public class MasterConnectionInfo extends AbstractRPCConnectionInfo {
+
+    private static final String MASTER_REGISTRY_CLASS_NAME =
+            "org.apache.hadoop.hbase.client.MasterRegistry";
+
+    protected MasterConnectionInfo(boolean isConnectionless, String principal, String keytab,
+            User user, String haGroup, String bootstrapServers) {
+        super(isConnectionless, principal, keytab, user, haGroup);
+        this.bootstrapServers = bootstrapServers;
+    }
+
+    @Override
+    public ReadOnlyProps asProps() {
+        if (isConnectionless) {
+            return ReadOnlyProps.EMPTY_PROPS;
+        }
+
+        Map<String, String> connectionProps = getCommonProps();
+        connectionProps.put(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+            MASTER_REGISTRY_CLASS_NAME);
+
+        if (bootstrapServers != null) {
+            // This is already normalized to include ports
+            connectionProps.put(HConstants.MASTER_ADDRS_KEY, bootstrapServers);
+        }
+
+        return connectionProps.isEmpty() ? ReadOnlyProps.EMPTY_PROPS
+                : new ReadOnlyProps(connectionProps.entrySet().iterator());
+    }
+
+    @Override
+    public String toUrl() {
+        return PhoenixRuntime.JDBC_PROTOCOL_MASTER + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR
+                + toString();
+    }
+
+    public static boolean isMaster(Configuration config) {
+        // Default is handled by the caller
+        return config != null && MASTER_REGISTRY_CLASS_NAME
+                .equals(config.get(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY));
+    }
+
+    /**
+     * Builder class for MasterConnectionInfo
+     *
+     * @since 138
+     */
+    protected static class Builder extends AbstractRPCConnectionInfo.Builder {
+
+        public Builder(String url, Configuration config, ReadOnlyProps props, Properties info)
+                throws SQLException {
+            super(url, config, props, info);
+            if (!HAS_MASTER_REGISTRY) {
+                throw getMalFormedUrlException(
+                    "HBase version does not support Master registry for: " + url);
+            }
+        }
+
+        @Override
+        protected void normalize() throws SQLException {
+            normalizeMaster();
+        }
+
+        @Override
+        protected ConnectionInfo build() {
+            return new MasterConnectionInfo(isConnectionless, principal, keytab, user, haGroup,
+                    hostsList);
+        }
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java
index 276b5128df..3184af7adf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java
@@ -69,13 +69,13 @@ public class ParallelPhoenixConnection implements PhoenixMonitoredConnection {
     CompletableFuture<PhoenixConnection> futureConnection2;
     public ParallelPhoenixConnection(ParallelPhoenixContext context) throws SQLException {
         this.context = context;
-        LOG.trace("First Url: {} Second Url: {}", context.getHaGroup().getGroupInfo().getUrl1(),
-                context.getHaGroup().getGroupInfo().getUrl2());
+        LOG.trace("First Url: {} Second Url: {}", context.getHaGroup().getGroupInfo().getJDBCUrl1(),
+                context.getHaGroup().getGroupInfo().getJDBCUrl2());
         futureConnection1 = context.chainOnConn1(() -> getConnection(context.getHaGroup(),
-                context.getHaGroup().getGroupInfo().getUrl1(),
+                context.getHaGroup().getGroupInfo().getJDBCUrl1(),
                 context.getProperties()));
         futureConnection2 = context.chainOnConn2(() -> getConnection(context.getHaGroup(),
-                context.getHaGroup().getGroupInfo().getUrl2(),
+                context.getHaGroup().getGroupInfo().getJDBCUrl2(),
                 context.getProperties()));
 
         // Ensure one connection is successful before returning
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixContext.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixContext.java
index 8df87493b8..567abad2dd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixContext.java
@@ -116,6 +116,7 @@ public class ParallelPhoenixContext {
     }
 
     public Properties getProperties() {
+        //FIXME should return immutable
         return properties;
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
index e67f0d6817..ca412d5238 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
@@ -48,6 +48,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.phoenix.util.PropertiesUtil;
 
 
 /**
@@ -197,7 +198,7 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
             if (result == null) {
                 synchronized(this) {
                     result = services;
-                    if(result == null) {
+                    if (result == null) {
                         services = result = new QueryServicesImpl(getDefaultProps());
                     }
                 }
@@ -239,33 +240,34 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
     }
     
     @Override
-    protected ConnectionQueryServices getConnectionQueryServices(String url, final Properties info) throws SQLException {
+    protected ConnectionQueryServices getConnectionQueryServices(String url, final Properties infoIn) throws SQLException {
         lockInterruptibly(LockMode.READ);
         try {
             checkClosed();
-            ConnectionInfo connInfo = ConnectionInfo.create(url);
             SQLException sqlE = null;
             boolean success = false;
             final QueryServices services = getQueryServices();
             ConnectionQueryServices connectionQueryServices = null;
             // Also performs the Kerberos login if the URL/properties request this
-            final ConnectionInfo normalizedConnInfo = connInfo.normalize(services.getProps(), info);
+            final Properties info = PropertiesUtil.deepCopy(infoIn);
+            final ConnectionInfo connInfo = ConnectionInfo.create(url, services.getProps(), info);
+            //Set connection parameters to normalized value from URL
+            info.putAll(connInfo.asProps().asMap());
             try {
                 connectionQueryServices =
-                    connectionQueryServicesCache.get(normalizedConnInfo, new Callable<ConnectionQueryServices>() {
+                    connectionQueryServicesCache.get(connInfo, new Callable<ConnectionQueryServices>() {
                         @Override
                         public ConnectionQueryServices call() throws Exception {
                             ConnectionQueryServices connectionQueryServices;
-                            if (normalizedConnInfo.isConnectionless()) {
-                                connectionQueryServices = new ConnectionlessQueryServicesImpl(services, normalizedConnInfo, info);
+                            if (connInfo.isConnectionless()) {
+                                connectionQueryServices = new ConnectionlessQueryServicesImpl(services, connInfo, info);
                             } else {
-                                connectionQueryServices = new ConnectionQueryServicesImpl(services, normalizedConnInfo, info);
+                                connectionQueryServices = new ConnectionQueryServicesImpl(services, connInfo, info);
                             }
 
                             return connectionQueryServices;
                         }
                     });
-
                 connectionQueryServices.init(url, info);
                 success = true;
             } catch (ExecutionException ee){
@@ -281,7 +283,7 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
             finally {
                 if (!success) {
                     // Remove from map, as initialization failed
-                    connectionQueryServicesCache.invalidate(normalizedConnInfo);
+                    connectionQueryServicesCache.invalidate(connInfo);
                     if (sqlE != null) {
                         throw sqlE;
                     }
@@ -313,8 +315,7 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
      * @throws SQLException if fails to generate key for CQS to invalidate
      */
     void invalidateCache(String url, Properties properties) throws SQLException {
-        ConnectionInfo connInfo = ConnectionInfo.create(url)
-                .normalize(getQueryServices().getProps(), properties);
+        ConnectionInfo connInfo = ConnectionInfo.create(url, getQueryServices().getProps(), properties);
         LOGGER.info("Invalidating the CQS from cache for connInfo={}", connInfo);
         connectionQueryServicesCache.invalidate(connInfo);
         LOGGER.debug(connectionQueryServicesCache.asMap().keySet().stream().map(Objects::toString).collect(Collectors.joining(",")));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
index d97e88844c..46f499f28d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
@@ -19,40 +19,25 @@ package org.apache.phoenix.jdbc;
 
 import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
 
-import java.io.IOException;
 import java.sql.Connection;
 import java.sql.Driver;
 import java.sql.DriverPropertyInfo;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.StringTokenizer;
-import java.util.Map.Entry;
 import java.util.logging.Logger;
 
 import javax.annotation.concurrent.Immutable;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.query.ConnectionQueryServices;
-import org.apache.phoenix.query.HBaseFactoryProvider;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableMap;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SQLCloseable;
-import org.slf4j.LoggerFactory;
-
-import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableMap;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 
 
 
@@ -70,11 +55,13 @@ public abstract class PhoenixEmbeddedDriver implements Driver, SQLCloseable {
      */ 
     private final static String DNC_JDBC_PROTOCOL_SUFFIX = "//";
     private final static String DRIVER_NAME = "PhoenixEmbeddedDriver";
-    private static final String TERMINATOR = "" + PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
-    private static final String DELIMITERS = TERMINATOR + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
     private static final String TEST_URL_AT_END =  "" + PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
     private static final String TEST_URL_IN_MIDDLE = TEST_URL_AT_END + PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
 
+    private static final String[] SUPPORTED_PROTOCOLS =
+            new String[] { PhoenixRuntime.JDBC_PROTOCOL, PhoenixRuntime.JDBC_PROTOCOL_ZK,
+                PhoenixRuntime.JDBC_PROTOCOL_MASTER, PhoenixRuntime.JDBC_PROTOCOL_RPC };
+
     private final static DriverPropertyInfo[] EMPTY_INFO = new DriverPropertyInfo[0];
     public final static String MAJOR_VERSION_PROP = "DriverMajorVersion";
     public final static String MINOR_VERSION_PROP = "DriverMinorVersion";
@@ -85,7 +72,7 @@ public abstract class PhoenixEmbeddedDriver implements Driver, SQLCloseable {
                     MAJOR_VERSION_PROP, Integer.toString(MetaDataProtocol.PHOENIX_MAJOR_VERSION),
                     MINOR_VERSION_PROP, Integer.toString(MetaDataProtocol.PHOENIX_MINOR_VERSION),
                     DRIVER_NAME_PROP, DRIVER_NAME));
-    
+
     PhoenixEmbeddedDriver() {
     }
     
@@ -98,33 +85,38 @@ public abstract class PhoenixEmbeddedDriver implements Driver, SQLCloseable {
     @Override
     public boolean acceptsURL(String url) throws SQLException {
         if (url.startsWith(PhoenixRuntime.JDBC_PROTOCOL)) {
-            // A connection string of "jdbc:phoenix" is supported, since
-            // all the connection information can potentially be gotten
-            // out of the HBase config file
-            if (url.length() == PhoenixRuntime.JDBC_PROTOCOL.length()) {
-                return true;
-            }
-            // Same as above, except for "jdbc:phoenix;prop=<value>..."
-            if (PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR == url.charAt(PhoenixRuntime.JDBC_PROTOCOL.length())) {
-                return true;
-            }
-            if (PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR == url.charAt(PhoenixRuntime.JDBC_PROTOCOL.length())) {
-                int protoLength = PhoenixRuntime.JDBC_PROTOCOL.length() + 1;
-                // A connection string of "jdbc:phoenix:" matches this driver,
-                // but will end up as a MALFORMED_CONNECTION_URL exception later.
-                if (url.length() == protoLength) {
-                    return true;
+            for (String protocol : SUPPORTED_PROTOCOLS) {
+                // A connection string of "jdbc:phoenix" is supported, since
+                // all the connection information can potentially be gotten
+                // out of the HBase config file
+                if (!url.startsWith(protocol)) {
+                    continue;
                 }
-                // Explicitly ignore connections of "jdbc:phoenix:thin"; leave them for
-                // the thin client
-                if (url.startsWith(PhoenixRuntime.JDBC_THIN_PROTOCOL)) {
-                    return false;
+                if (url.length() == protocol.length()) {
+                    return true;
                 }
-                // A connection string of the form "jdbc:phoenix://" means that
-                // the driver is remote which isn't supported, so return false.
-                if (!url.startsWith(DNC_JDBC_PROTOCOL_SUFFIX, protoLength)) {
+                // Same as above, except for "jdbc:phoenix;prop=<value>..."
+                if (PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR == url.charAt(protocol.length())) {
                     return true;
                 }
+                if (PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR == url.charAt(protocol.length())) {
+                    int protoLength = protocol.length() + 1;
+                    // A connection string of "jdbc:phoenix:" matches this driver,
+                    // but will end up as a MALFORMED_CONNECTION_URL exception later.
+                    if (url.length() == protoLength) {
+                        return true;
+                    }
+                    // Explicitly ignore connections of "jdbc:phoenix:thin"; leave them for
+                    // the thin client
+                    if (url.startsWith(PhoenixRuntime.JDBC_THIN_PROTOCOL)) {
+                        return false;
+                    }
+                    // A connection string of the form "jdbc:phoenix://" means that
+                    // the driver is remote which isn't supported, so return false.
+                    if (!url.startsWith(DNC_JDBC_PROTOCOL_SUFFIX, protoLength)) {
+                        return true;
+                    }
+                }
             }
         }
         return false;
@@ -140,18 +132,19 @@ public abstract class PhoenixEmbeddedDriver implements Driver, SQLCloseable {
     }
 
     protected final Connection createConnection(String url, Properties info) throws SQLException {
-      Properties augmentedInfo = PropertiesUtil.deepCopy(info);
-      augmentedInfo.putAll(getDefaultProps().asMap());
+        Properties augmentedInfo = PropertiesUtil.deepCopy(info);
+        augmentedInfo.putAll(getDefaultProps().asMap());
         if (url.contains("|")) {
             // High availability connection using two clusters
             Optional<HighAvailabilityGroup> haGroup = HighAvailabilityGroup.get(url, augmentedInfo);
-            if(haGroup.isPresent()){
+            if (haGroup.isPresent()) {
                 return haGroup.get().connect(augmentedInfo);
             } else {
                 // If empty HA group is returned, fall back to single cluster.
-                url = HighAvailabilityGroup.getFallbackCluster(url, info)
-                        .orElseThrow(() -> new SQLException(
-                                "HA group can not be initialized, not fallback to single cluster"));
+                url =
+                        HighAvailabilityGroup.getFallbackCluster(url, info).orElseThrow(
+                            () -> new SQLException(
+                                    "HA group can not be initialized, fallback to single cluster"));
             }
         }
         ConnectionQueryServices cqs = getConnectionQueryServices(url, augmentedInfo);
@@ -198,485 +191,6 @@ public abstract class PhoenixEmbeddedDriver implements Driver, SQLCloseable {
     public void close() throws SQLException {
     }
     
-    /**
-     *
-     * Class to encapsulate connection info for HBase
-     *
-     *
-     * @since 0.1.1
-     */
-    public static class ConnectionInfo {
-        private static final org.slf4j.Logger LOGGER =
-                LoggerFactory.getLogger(ConnectionInfo.class);
-        private static final Object KERBEROS_LOGIN_LOCK = new Object();
-        private static final char WINDOWS_SEPARATOR_CHAR = '\\';
-        private static final String REALM_EQUIVALENCY_WARNING_MSG = "Provided principal does not contan a realm and the default realm cannot be determined. Ignoring realm equivalency check.";
-        private static SQLException getMalFormedUrlException(String url) {
-            return new SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL)
-            .setMessage(url).build().buildException();
-        }
-        
-        public String getZookeeperConnectionString() {
-            return getZookeeperQuorum() + ":" + getPort();
-        }
-        
-        /**
-         * Detect url with quorum:1,quorum:2 as HBase does not handle different port numbers
-         * for different quorum hostnames.
-         * @param portStr
-         * @return
-         */
-        private static boolean isMultiPortUrl(String portStr) {
-            int commaIndex = portStr.indexOf(',');
-            if (commaIndex > 0) {
-                try {
-                    Integer.parseInt(portStr.substring(0, commaIndex));
-                    return true;
-                } catch (NumberFormatException otherE) {
-                }
-            }
-            return false;
-        }
-        
-        public static ConnectionInfo create(String url) throws SQLException {
-            url = url == null ? "" : url;
-            if (url.isEmpty() || url.equalsIgnoreCase("jdbc:phoenix:")
-                    || url.equalsIgnoreCase("jdbc:phoenix")) {
-                return defaultConnectionInfo(url);
-            }
-            url = url.startsWith(PhoenixRuntime.JDBC_PROTOCOL)
-                    ? url.substring(PhoenixRuntime.JDBC_PROTOCOL.length())
-                    : PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + url;
-            StringTokenizer tokenizer = new StringTokenizer(url, DELIMITERS, true);
-            int nTokens = 0;
-            String[] tokens = new String[5];
-            String token = null;
-            while (tokenizer.hasMoreTokens() &&
-                    !(token=tokenizer.nextToken()).equals(TERMINATOR) &&
-                    tokenizer.hasMoreTokens() && nTokens < tokens.length) {
-                token = tokenizer.nextToken();
-                // This would mean we have an empty string for a token which is illegal
-                if (DELIMITERS.contains(token)) {
-                    throw getMalFormedUrlException(url);
-                }
-                tokens[nTokens++] = token;
-            }
-            // Look-forward to see if the last token is actually the C:\\ path
-            if (tokenizer.hasMoreTokens() && !TERMINATOR.equals(token)) {
-                String extraToken = tokenizer.nextToken();
-                if (WINDOWS_SEPARATOR_CHAR == extraToken.charAt(0)) {
-                  String prevToken = tokens[nTokens - 1];
-                  tokens[nTokens - 1] = prevToken + ":" + extraToken;
-                  if (tokenizer.hasMoreTokens() && !(token=tokenizer.nextToken()).equals(TERMINATOR)) {
-                      throw getMalFormedUrlException(url);
-                  }
-                } else {
-                    throw getMalFormedUrlException(url);
-                }
-            }
-            String quorum = null;
-            Integer port = null;
-            String rootNode = null;
-            String principal = null;
-            String keytabFile = null;
-            int tokenIndex = 0;
-            if (nTokens > tokenIndex) {
-                quorum = tokens[tokenIndex++]; // Found quorum
-                if (nTokens > tokenIndex) {
-                    try {
-                        port = Integer.parseInt(tokens[tokenIndex]);
-                        if (port < 0) {
-                            throw getMalFormedUrlException(url);
-                        }
-                        tokenIndex++; // Found port
-                    } catch (NumberFormatException e) { // No port information
-                        if (isMultiPortUrl(tokens[tokenIndex])) {
-                            throw getMalFormedUrlException(url);
-                        }
-                    }
-                    if (nTokens > tokenIndex) {
-                        if (tokens[tokenIndex].startsWith("/")) {
-                            rootNode = tokens[tokenIndex++]; // Found rootNode
-                        }
-                        if (nTokens > tokenIndex) {
-                            principal = tokens[tokenIndex++]; // Found principal
-                            if (nTokens > tokenIndex) {
-                                keytabFile = tokens[tokenIndex++]; // Found keytabFile
-                                // There's still more after, try to see if it's a windows file path
-                                if (tokenIndex < tokens.length) {
-                                    String nextToken = tokens[tokenIndex++];
-                                    // The next token starts with the directory separator, assume
-                                    // it's still the keytab path.
-                                    if (null != nextToken && WINDOWS_SEPARATOR_CHAR == nextToken.charAt(0)) {
-                                        keytabFile = keytabFile + ":" + nextToken;
-                                    }
-                                }
-                            }
-                        }
-                    }
-                }
-            }
-            return new ConnectionInfo(quorum,port,rootNode, principal, keytabFile);
-        }
-        
-        public ConnectionInfo normalize(ReadOnlyProps props, Properties info) throws SQLException {
-            String zookeeperQuorum = this.getZookeeperQuorum();
-            Integer port = this.getPort();
-            String rootNode = this.getRootNode();
-            String keytab = this.getKeytab();
-            String principal = this.getPrincipal();
-            // Normalize connInfo so that a url explicitly specifying versus implicitly inheriting
-            // the default values will both share the same ConnectionQueryServices.
-            if (zookeeperQuorum == null) {
-                zookeeperQuorum = props.get(QueryServices.ZOOKEEPER_QUORUM_ATTRIB);
-                if (zookeeperQuorum == null) {
-                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL)
-                    .setMessage(this.toString()).build().buildException();
-                }
-            }
-
-            if (port == null) {
-                if (!isConnectionless) {
-                    String portStr = props.get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
-                    if (portStr != null) {
-                        try {
-                            port = Integer.parseInt(portStr);
-                        } catch (NumberFormatException e) {
-                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL)
-                            .setMessage(this.toString()).build().buildException();
-                        }
-                    }
-                }
-            } else if (isConnectionless) {
-                throw new SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL)
-                .setMessage("Port may not be specified when using the connectionless url \"" + this.toString() + "\"").build().buildException();
-            }
-            if (rootNode == null) {
-                if (!isConnectionless) {
-                    rootNode = props.get(QueryServices.ZOOKEEPER_ROOT_NODE_ATTRIB);
-                }
-            } else if (isConnectionless) {
-                throw new SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL)
-                .setMessage("Root node may not be specified when using the connectionless url \"" + this.toString() + "\"").build().buildException();
-            }
-            if(principal == null){
-                if (!isConnectionless) {
-                    principal = props.get(QueryServices.HBASE_CLIENT_PRINCIPAL);
-                }
-            }
-            if(keytab == null){
-                 if (!isConnectionless) {
-                     keytab = props.get(QueryServices.HBASE_CLIENT_KEYTAB);
-                 }
-            }
-            if (!isConnectionless()) {
-                boolean credsProvidedInUrl = null != principal && null != keytab;
-                boolean credsProvidedInProps = info.containsKey(QueryServices.HBASE_CLIENT_PRINCIPAL) && info.containsKey(QueryServices.HBASE_CLIENT_KEYTAB);
-                if (credsProvidedInUrl || credsProvidedInProps) {
-                    // PHOENIX-3189 Because ConnectionInfo is immutable, we must make sure all parts of it are correct before
-                    // construction; this also requires the Kerberos user credentials object (since they are compared by reference
-                    // and not by value. If the user provided a principal and keytab via the JDBC url, we must make sure that the
-                    // Kerberos login happens *before* we construct the ConnectionInfo object. Otherwise, the use of ConnectionInfo
-                    // to determine when ConnectionQueryServices impl's should be reused will be broken.
-                    try {
-                        // Check if we need to authenticate with kerberos so that we cache the correct ConnectionInfo
-                        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
-                        if (!currentUser.hasKerberosCredentials() || !isSameName(currentUser.getUserName(), principal)) {
-                            synchronized (KERBEROS_LOGIN_LOCK) {
-                                // Double check the current user, might have changed since we checked last. Don't want
-                                // to re-login if it's the same user.
-                                currentUser = UserGroupInformation.getCurrentUser();
-                                if (!currentUser.hasKerberosCredentials() || !isSameName(currentUser.getUserName(), principal)) {
-                                    final Configuration config = getConfiguration(props, info, principal, keytab);
-                                    LOGGER.info("Trying to connect to a secure cluster as {} " +
-                                                    "with keytab {}",
-                                            config.get(QueryServices.HBASE_CLIENT_PRINCIPAL),
-                                            config.get(QueryServices.HBASE_CLIENT_KEYTAB));
-                                    UserGroupInformation.setConfiguration(config);
-                                    User.login(config, QueryServices.HBASE_CLIENT_KEYTAB, QueryServices.HBASE_CLIENT_PRINCIPAL, null);
-                                    LOGGER.info("Successful login to secure cluster");
-                                }
-                            }
-                        } else {
-                            // The user already has Kerberos creds, so there isn't anything to change in the ConnectionInfo.
-                            LOGGER.debug("Already logged in as {}", currentUser);
-                        }
-                    } catch (IOException e) {
-                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
-                            .setRootCause(e).build().buildException();
-                    }
-                } else {
-                    LOGGER.debug("Principal and keytab not provided, not attempting Kerberos login");
-                }
-            } // else, no connection, no need to login
-            // Will use the current User from UGI
-            String haGroup = info.getProperty(HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR);
-            return new ConnectionInfo(zookeeperQuorum, port, rootNode, principal, keytab, haGroup);
-
-        }
-
-        // Visible for testing
-        static boolean isSameName(String currentName, String newName) throws IOException {
-            return isSameName(currentName, newName, null, getDefaultKerberosRealm());
-        }
-
-        /**
-         * Computes the default kerberos realm if one is available. If one cannot be computed, null
-         * is returned.
-         *
-         * @return The default kerberos realm, or null.
-         */
-        static String getDefaultKerberosRealm() {
-            try {
-                return KerberosUtil.getDefaultRealm();
-            } catch (Exception e) {
-                if (LOGGER.isDebugEnabled()) {
-                    // Include the stacktrace at DEBUG
-                    LOGGER.debug(REALM_EQUIVALENCY_WARNING_MSG, e);
-                } else {
-                    // Limit the content at WARN
-                    LOGGER.warn(REALM_EQUIVALENCY_WARNING_MSG);
-                }
-            }
-            return null;
-        }
-
-        static boolean isSameName(String currentName, String newName, String hostname) throws IOException {
-            return isSameName(currentName, newName, hostname, getDefaultKerberosRealm());
-        }
-
-        static boolean isSameName(String currentName, String newName, String hostname, String defaultRealm) throws IOException {
-            final boolean newNameContainsRealm = newName.indexOf('@') != -1;
-            // Make sure to replace "_HOST" if it exists before comparing the principals.
-            if (newName.contains(org.apache.hadoop.security.SecurityUtil.HOSTNAME_PATTERN)) {
-                if (newNameContainsRealm) {
-                    newName = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(newName, hostname);
-                } else {
-                    // If the principal ends with "/_HOST", replace "_HOST" with the hostname.
-                    if (newName.endsWith("/_HOST")) {
-                        newName = newName.substring(0, newName.length() - 5) + hostname;
-                    }
-                }
-            }
-            // The new name doesn't contain a realm and we could compute a default realm
-            if (!newNameContainsRealm && defaultRealm != null) {
-                return currentName.equals(newName + "@" + defaultRealm);
-            }
-            // We expect both names to contain a realm, so we can do a simple equality check
-            return currentName.equals(newName);
-        }
-
-        /**
-         * Constructs a Configuration object to use when performing a Kerberos login.
-         * @param props QueryServices properties
-         * @param info User-provided properties
-         * @param principal Kerberos user principal
-         * @param keytab Path to Kerberos user keytab
-         * @return Configuration object suitable for Kerberos login
-         */
-        private Configuration getConfiguration(ReadOnlyProps props, Properties info, String principal, String keytab) {
-            final Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
-            // Add QueryServices properties
-            for (Entry<String,String> entry : props) {
-                config.set(entry.getKey(), entry.getValue());
-            }
-            // Add any user-provided properties (via DriverManager)
-            if (info != null) {
-                for (Object key : info.keySet()) {
-                    config.set((String) key, info.getProperty((String) key));
-                }
-            }
-            // Set the principal and keytab if provided from the URL (overriding those provided in Properties)
-            if (null != principal) {
-                config.set(QueryServices.HBASE_CLIENT_PRINCIPAL, principal);
-            }
-            if (null != keytab) {
-                config.set(QueryServices.HBASE_CLIENT_KEYTAB, keytab);
-            }
-            return config;
-        }
-        
-        private final Integer port;
-        private final String rootNode;
-        private final String zookeeperQuorum;
-        private final boolean isConnectionless;
-        private final String principal;
-        private final String keytab;
-        private final User user;
-        private final String haGroup;
-
-        public ConnectionInfo(String zookeeperQuorum, Integer port, String rootNode, String principal, String keytab, String haGroup) {
-            this.zookeeperQuorum = zookeeperQuorum;
-            this.port = port;
-            this.rootNode = rootNode;
-            this.isConnectionless = PhoenixRuntime.CONNECTIONLESS.equals(zookeeperQuorum);
-            this.principal = principal;
-            this.keytab = keytab;
-            try {
-                this.user = User.getCurrent();
-            } catch (IOException e) {
-                throw new RuntimeException("Couldn't get the current user!!", e);
-            }
-            if (null == this.user) {
-                throw new RuntimeException("Acquired null user which should never happen");
-            }
-            this.haGroup = haGroup;
-        }
-
-        public ConnectionInfo(String zookeeperQuorum, Integer port, String rootNode, String principal, String keytab) {
-            this(zookeeperQuorum, port, rootNode, principal, keytab, null);
-        }
-        
-        public ConnectionInfo(String zookeeperQuorum, Integer port, String rootNode) {
-            this(zookeeperQuorum, port, rootNode, null, null);
-        }
-
-        /**
-         * Copy constructor for all members except {@link #user}.
-         *
-         * @param other The instance to copy
-         */
-        public ConnectionInfo(ConnectionInfo other) {
-            this(other.zookeeperQuorum, other.port, other.rootNode, other.principal, other.keytab, other.haGroup);
-        }
-
-        public ReadOnlyProps asProps() {
-            Map<String, String> connectionProps = Maps.newHashMapWithExpectedSize(3);
-            if (getZookeeperQuorum() != null) {
-                connectionProps.put(QueryServices.ZOOKEEPER_QUORUM_ATTRIB, getZookeeperQuorum());
-            }
-            if (getPort() != null) {
-                connectionProps.put(QueryServices.ZOOKEEPER_PORT_ATTRIB, getPort().toString());
-            }
-            if (getRootNode() != null) {
-                connectionProps.put(QueryServices.ZOOKEEPER_ROOT_NODE_ATTRIB, getRootNode());
-            }
-            if (getPrincipal() != null && getKeytab() != null) {
-                connectionProps.put(QueryServices.HBASE_CLIENT_PRINCIPAL, getPrincipal());
-                connectionProps.put(QueryServices.HBASE_CLIENT_KEYTAB, getKeytab());
-            }
-            if (getHaGroup()!= null) {
-                connectionProps.put(QueryServices.HA_GROUP_NAME_ATTRIB, getRootNode());
-            }
-
-            return connectionProps.isEmpty() ? ReadOnlyProps.EMPTY_PROPS : new ReadOnlyProps(
-                    connectionProps.entrySet().iterator());
-        }
-        
-        public boolean isConnectionless() {
-            return isConnectionless;
-        }
-        
-        public String getZookeeperQuorum() {
-            return zookeeperQuorum;
-        }
-
-        public Integer getPort() {
-            return port;
-        }
-
-        public String getRootNode() {
-            return rootNode;
-        }
-        
-        public String getKeytab() {
-            return keytab;
-        }
-
-        public String getPrincipal() {
-            return principal;
-        }
-
-        public User getUser() {
-            return user;
-        }
-
-        public String getHaGroup() {
-            return haGroup;
-        }
-
-        @Override
-        public int hashCode() {
-            final int prime = 31;
-            int result = 1;
-            result = prime * result + ((zookeeperQuorum == null) ? 0 : zookeeperQuorum.hashCode());
-            result = prime * result + ((port == null) ? 0 : port.hashCode());
-            result = prime * result + ((rootNode == null) ? 0 : rootNode.hashCode());
-            result = prime * result + ((principal == null) ? 0 : principal.hashCode());
-            result = prime * result + ((keytab == null) ? 0 : keytab.hashCode());
-            result = prime * result + ((haGroup == null) ? 0 : haGroup.hashCode());
-            // `user` is guaranteed to be non-null
-            result = prime * result + user.hashCode();
-            return result;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (this == obj) return true;
-            if (obj == null) return false;
-            if (getClass() != obj.getClass()) return false;
-            ConnectionInfo other = (ConnectionInfo) obj;
-            // `user` is guaranteed to be non-null
-            if (!other.user.equals(user)) return false;
-            if (zookeeperQuorum == null) {
-                if (other.zookeeperQuorum != null) return false;
-            } else if (!zookeeperQuorum.equals(other.zookeeperQuorum)) return false;
-            if (port == null) {
-                if (other.port != null) return false;
-            } else if (!port.equals(other.port)) return false;
-            if (rootNode == null) {
-                if (other.rootNode != null) return false;
-            } else if (!rootNode.equals(other.rootNode)) return false;
-            if (principal == null) {
-                if (other.principal != null) return false;
-            } else if (!principal.equals(other.principal)) return false;
-            if (keytab == null) {
-                if (other.keytab != null) return false;
-            } else if (!keytab.equals(other.keytab)) return false;
-            if (haGroup == null) {
-                if (other.haGroup != null) return false;
-            } else if (!haGroup.equals(other.haGroup)) return false;
-            return true;
-        }
-        
-        @Override
-        public String toString() {
-            return zookeeperQuorum + (port == null ? "" : ":" + port)
-                    + (rootNode == null ? "" : ":" + rootNode)
-                    + (principal == null ? "" : ":" + principal)
-                    + (keytab == null ? "" : ":" + keytab)
-                    + (haGroup == null ? "" : ":" + haGroup);
-
-        }
-
-        public String toUrl() {
-            // Some tests like PhoenixConfigurationUtilTest add JDBC_PROTOCOL to zookeeper quorum
-            if (toString().contains(PhoenixRuntime.JDBC_PROTOCOL)) {
-                return  toString();
-            } else {
-                return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR
-                        + toString();
-            }
-        }
-
-        private static ConnectionInfo defaultConnectionInfo(String url) throws SQLException {
-            Configuration config =
-                    HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
-            String quorum = config.get(HConstants.ZOOKEEPER_QUORUM);
-            if (quorum == null || quorum.isEmpty()) {
-                throw getMalFormedUrlException(url);
-            }
-            String clientPort = config.get(HConstants.ZOOKEEPER_CLIENT_PORT);
-            Integer port = clientPort==null ? null : Integer.parseInt(clientPort);
-            if (port == null || port < 0) {
-                throw getMalFormedUrlException(url);
-            }
-            String znodeParent = config.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
-            LOGGER.debug("Getting default jdbc connection url "
-                    + quorum + ":" + port + ":" + znodeParent);
-            return new ConnectionInfo(quorum, port, znodeParent);
-        }
-    }
 
     public static boolean isTestUrl(String url) {
         return url.endsWith(TEST_URL_AT_END) || url.contains(TEST_URL_IN_MIDDLE);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
index a47a5aac1f..c6bdadc335 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import org.apache.phoenix.util.JDBCUtil;
 import org.apache.phoenix.util.JacksonUtil;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -329,7 +330,7 @@ public class PhoenixHAAdminTool extends Configured implements Tool {
             Preconditions.checkNotNull(zkUrl);
             Preconditions.checkNotNull(conf);
             Preconditions.checkNotNull(highAvailibilityCuratorProvider);
-            this.zkUrl = zkUrl;
+            this.zkUrl = JDBCUtil.formatZookeeperUrl(zkUrl);
             this.conf = conf;
             conf.iterator().forEachRemaining(k -> properties.setProperty(k.getKey(), k.getValue()));
             this.highAvailibilityCuratorProvider = highAvailibilityCuratorProvider;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/RPCConnectionInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/RPCConnectionInfo.java
new file mode 100644
index 0000000000..9baeb66b30
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/RPCConnectionInfo.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hbase.thirdparty.com.google.common.base.Strings;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+
+/**
+ * ConnectionInfo class for org.apache.hadoop.hbase.client.RpcConnectionRegistry
+ *
+ * @since 138
+ */
+public class RPCConnectionInfo extends AbstractRPCConnectionInfo {
+
+    // We may be on an older HBase version, which does not even have RpcConnectionRegistry
+    private static final String BOOTSTRAP_NODES = "hbase.client.bootstrap.servers";
+    private static final String RPC_REGISTRY_CLASS_NAME =
+            "org.apache.hadoop.hbase.client.RpcConnectionRegistry";
+
+    protected RPCConnectionInfo(boolean isConnectionless, String principal, String keytab,
+            User user, String haGroup, String bootstrapServers) {
+        super(isConnectionless, principal, keytab, user, haGroup);
+        this.bootstrapServers = bootstrapServers;
+    }
+
+    @Override
+    public ReadOnlyProps asProps() {
+        if (isConnectionless) {
+            return ReadOnlyProps.EMPTY_PROPS;
+        }
+
+        Map<String, String> connectionProps = getCommonProps();
+
+        connectionProps.put(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+            RPC_REGISTRY_CLASS_NAME);
+
+        if (getBoostrapServers() != null) {
+            // This is already normalized to include ports
+            connectionProps.put(HConstants.MASTER_ADDRS_KEY, bootstrapServers);
+        }
+
+        return connectionProps.isEmpty() ? ReadOnlyProps.EMPTY_PROPS
+                : new ReadOnlyProps(connectionProps.entrySet().iterator());
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((bootstrapServers == null) ? 0 : bootstrapServers.hashCode());
+        // Port is already provided in or normalized into bootstrapServers
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!super.equals(obj)) {
+            return false;
+        }
+        RPCConnectionInfo other = (RPCConnectionInfo) obj;
+        if (bootstrapServers == null) {
+            if (other.bootstrapServers != null) {
+                return false;
+            }
+        } else if (!bootstrapServers.equals(other.bootstrapServers)) {
+            return false;
+        }
+        // Port is already provided in or normalized into bootstrapServers
+        return true;
+    }
+
+    @Override
+    public String toUrl() {
+        return PhoenixRuntime.JDBC_PROTOCOL_RPC + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR
+                + toString();
+    }
+
+    public static boolean isRPC(Configuration config) {
+        // Default is handled by the caller
+        return config != null && RPC_REGISTRY_CLASS_NAME
+                .equals(config.get(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY));
+    }
+
+    /**
+     * Builder parent for RPCConnectionInfo.
+     */
+    protected static class Builder extends AbstractRPCConnectionInfo.Builder {
+
+        public Builder(String url, Configuration config, ReadOnlyProps props, Properties info)
+                throws SQLException {
+            super(url, config, props, info);
+            if (!HAS_RPC_REGISTRY) {
+                throw getMalFormedUrlException(
+                    "Hbase version does not support Master registry for: " + url);
+            }
+        }
+
+        @Override
+        protected void normalize() throws SQLException {
+            if (hostsList != null && hostsList.isEmpty()) {
+                hostsList = null;
+            }
+            if (portString != null && portString.isEmpty()) {
+                portString = null;
+            }
+
+            // We don't have a default port for RPC Connections
+            // Well, we do if we fall back to Master
+            boolean noServerListinURL = false;
+            if (hostsList == null) {
+                hostsList = getBootstrapServerAddr();
+                noServerListinURL = true;
+                if (hostsList == null) {
+                    // Fall back to MasterRegistry behaviour
+                    normalizeMaster();
+                    return;
+                }
+            } else {
+                hostsList = hostsList.replaceAll("=", ":");
+            }
+
+            if (portString != null) {
+                try {
+                    port = Integer.parseInt(portString);
+                    if (port < 0) {
+                        throw new Exception();
+                    }
+                } catch (Exception e) {
+                    throw getMalFormedUrlException(url);
+                }
+            }
+
+            if (isConnectionless) {
+                // We probably don't create connectionless MasterConnectionInfo objects
+                if (port != null) {
+                    throw getMalFormedUrlException(url);
+                } else {
+                    return;
+                }
+            }
+
+            // RpcConnectionRegistry doesn't have a default port property, be we accept the legacy
+            // format
+            // from the URL if both host list and port is provided
+            if (port != null && !noServerListinURL) {
+                hostsList = normalizeHostsList(hostsList, port);
+            }
+        }
+
+        public String getBootstrapServerAddr() {
+            String configuredBootstrapNodes = get(BOOTSTRAP_NODES);
+            if (!Strings.isNullOrEmpty(configuredBootstrapNodes)) {
+                return configuredBootstrapNodes;
+            } else {
+                return null;
+            }
+        }
+
+        @Override
+        protected ConnectionInfo build() {
+            return new RPCConnectionInfo(isConnectionless, principal, keytab, user, haGroup,
+                    hostsList);
+        }
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ZKConnectionInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ZKConnectionInfo.java
new file mode 100644
index 0000000000..28743ba5ee
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ZKConnectionInfo.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+
+/**
+ * ConnectionInfo class for org.apache.hadoop.hbase.client.ZKConnectionRegistry
+ *
+ * This used to be the only supported Registry in Phoenix (and the only one implemented by HBase)
+ *
+ */
+public class ZKConnectionInfo extends ConnectionInfo {
+
+    private static final String ZK_REGISTRY_NAME =
+            "org.apache.hadoop.hbase.client.ZKConnectionRegistry";
+
+    private final Integer zkPort;
+    private final String zkRootNode;
+    private final String zkHosts;
+
+    private ZKConnectionInfo(boolean isConnectionless, String principal, String keytab, User user,
+            String haGroup, String zkHosts, Integer zkPort, String zkRootNode) {
+        super(isConnectionless, principal, keytab, user, haGroup);
+        this.zkPort = zkPort;
+        this.zkRootNode = zkRootNode;
+        this.zkHosts = zkHosts;
+    }
+
+    public String getZkHosts() {
+        return zkHosts;
+    }
+
+    public Integer getZkPort() {
+        return zkPort;
+    }
+
+    public String getZkRootNode() {
+        return zkRootNode;
+    }
+
+    @Override
+    public String getZookeeperConnectionString() {
+        // Normalized form includes ports
+        return getZkHosts();
+    }
+
+    @Override
+    public ReadOnlyProps asProps() {
+        if (isConnectionless) {
+            return ReadOnlyProps.EMPTY_PROPS;
+        }
+
+        Map<String, String> connectionProps = getCommonProps();
+        connectionProps.put(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+            ZK_REGISTRY_NAME);
+
+        if (getZkHosts() != null) {
+            //This has the highest priority
+            connectionProps.put(HConstants.CLIENT_ZOOKEEPER_QUORUM, getZkHosts());
+        }
+        //Port is already normalized into zkHosts
+        if (getZkRootNode() != null) {
+            connectionProps.put(HConstants.ZOOKEEPER_ZNODE_PARENT, getZkRootNode());
+        }
+        return connectionProps.isEmpty() ? ReadOnlyProps.EMPTY_PROPS
+                : new ReadOnlyProps(connectionProps.entrySet().iterator());
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((zkHosts == null) ? 0 : zkHosts.hashCode());
+        //Port is already included in zkHosts
+        result = prime * result + ((zkRootNode == null) ? 0 : zkRootNode.hashCode());
+        result = prime * result + super.hashCode();
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!super.equals(obj)) {
+            return false;
+        }
+        ZKConnectionInfo other = (ZKConnectionInfo) obj;
+        if (zkHosts == null) {
+            if (other.zkHosts != null) {
+                return false;
+            }
+        } else if (!zkHosts.equals(other.zkHosts)) {
+            return false;
+        }
+        //Port is already normalized into zkHosts
+        if (zkRootNode == null) {
+            if (other.zkRootNode != null) {
+                return false;
+            }
+        } else if (!zkRootNode.equals(other.zkRootNode)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(zkHosts.replaceAll(":", "\\\\:"));
+        if (anyNotNull(zkPort, zkRootNode, principal, keytab)) {
+            sb.append(zkPort == null ? ":" : ":" + zkPort);
+        }
+        if (anyNotNull(zkRootNode, principal, keytab)) {
+            sb.append(zkRootNode == null ? ":" : ":" + zkRootNode);
+        }
+        if (anyNotNull(principal, keytab)) {
+            sb.append(principal == null ? ":" : ":" + principal);
+        }
+        if (anyNotNull(keytab)) {
+            sb.append(keytab == null ? ":" : ":" + keytab);
+        }
+        return sb.toString();
+    }
+
+    @Override
+    public String toUrl() {
+        return PhoenixRuntime.JDBC_PROTOCOL_ZK + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR
+                + toString();
+    }
+
+    /**
+     * Builder helper class for ZKConnectionInfo
+     *
+     */
+    protected static class Builder extends ConnectionInfo.Builder {
+        private Integer zkPort;
+        private String zkRootNode;
+        private String zkHosts;
+
+        public Builder(String url, Configuration config, ReadOnlyProps props, Properties info) {
+            super(url, config, props, info);
+        }
+
+        @Override
+        protected ConnectionInfo create() throws SQLException {
+            parse();
+            normalize();
+            handleKerberosAndLogin();
+            setHaGroup();
+            return build();
+        }
+
+        /**
+         * Detect url with quorum:1,quorum:2 as HBase does not handle different port numbers
+         * for different quorum hostnames.
+         * @param portStr
+         * @return
+         */
+        private boolean isMultiPortUrl(String portStr) {
+            int commaIndex = portStr.indexOf(',');
+            if (commaIndex > 0) {
+                try {
+                    Integer.parseInt(portStr.substring(0, commaIndex));
+                    return true;
+                } catch (NumberFormatException otherE) {
+                }
+            }
+            return false;
+        }
+
+        private void parse() throws SQLException {
+            StringTokenizer tokenizer = getTokenizerWithoutProtocol();
+            int nTokens = 0;
+            String[] tokens = new String[5];
+            String token = null;
+            boolean wasDelimiter = false;
+            boolean first = true;
+            while (tokenizer.hasMoreTokens() && !(token = tokenizer.nextToken()).equals(TERMINATOR)
+                    && nTokens < tokens.length) {
+                // This would mean we have an empty string for a token which is illegal
+                if (DELIMITERS.contains(token)) {
+                    if (wasDelimiter && !first) {
+                        tokens[nTokens++] = "";
+                    }
+                    wasDelimiter = true;
+                } else {
+                    tokens[nTokens++] = token;
+                    wasDelimiter = false;
+                }
+                first = false;
+            }
+            // Look-forward to see if the last token is actually the C:\\ path
+            if (tokenizer.hasMoreTokens() && !TERMINATOR.equals(token)) {
+                String extraToken = tokenizer.nextToken();
+                if (WINDOWS_SEPARATOR_CHAR == extraToken.charAt(0)) {
+                    String prevToken = tokens[nTokens - 1];
+                    tokens[nTokens - 1] = prevToken + ":" + extraToken;
+                    if (tokenizer.hasMoreTokens()
+                            && !(token = tokenizer.nextToken()).equals(TERMINATOR)) {
+                        throw getMalFormedUrlException(url);
+                    }
+                } else {
+                    throw getMalFormedUrlException(url);
+                }
+            }
+            int tokenIndex = 0;
+            if (nTokens > tokenIndex) {
+                zkHosts = tokens[tokenIndex++]; // Found quorum
+                if (nTokens > tokenIndex) {
+                    try {
+                        zkPort = Integer.parseInt(tokens[tokenIndex]);
+                        if (zkPort < 0) {
+                            throw getMalFormedUrlException(url);
+                        }
+                        tokenIndex++; // Found port
+                    } catch (NumberFormatException e) { // No port information
+                        if (tokens[tokenIndex].isEmpty()) {
+                            tokenIndex++; // Found empty port
+                        }
+                        if (isMultiPortUrl(tokens[tokenIndex])) {
+                            throw getMalFormedUrlException(url);
+                        }
+                        // Otherwise assume port is simply omitted
+                    }
+                    if (nTokens > tokenIndex) {
+                        if (tokens[tokenIndex].startsWith("/") || tokens[tokenIndex].isEmpty()) {
+                            zkRootNode = tokens[tokenIndex++]; // Found rootNode
+                        }
+                        if (nTokens > tokenIndex) {
+                            principal = tokens[tokenIndex++]; // Found principal
+                            if (nTokens > tokenIndex) {
+                                keytab = tokens[tokenIndex++]; // Found keytabFile
+                                // There's still more after, try to see if it's a windows file path
+                                if (tokenIndex < tokens.length) {
+                                    String nextToken = tokens[tokenIndex++];
+                                    // The next token starts with the directory separator, assume
+                                    // it's still the keytab path.
+                                    if (null != nextToken
+                                            && WINDOWS_SEPARATOR_CHAR == nextToken.charAt(0)) {
+                                        keytab = keytab + ":" + nextToken;
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        protected ConnectionInfo build() {
+            return new ZKConnectionInfo(isConnectionless, principal, keytab, user, haGroup, zkHosts,
+                    zkPort, zkRootNode);
+        }
+
+        @Override
+        protected void normalize() throws SQLException {
+            // Treat empty as null
+            if (zkHosts != null && zkHosts.isEmpty()) {
+                zkHosts = null;
+            }
+            if (zkRootNode != null && zkRootNode.isEmpty()) {
+                zkRootNode = null;
+            }
+            isConnectionless = PhoenixRuntime.CONNECTIONLESS.equals(zkHosts);
+
+            if (isConnectionless) {
+                if (zkPort != null || zkRootNode != null) {
+                    throw getMalFormedUrlException(url);
+                } else {
+                    return;
+                }
+            }
+
+            // Normalize connInfo so that a url explicitly specifying versus implicitly inheriting
+            // the default values will both share the same ConnectionQueryServices.
+            if (zkPort == null) {
+                String zkPortString = get(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT);
+                if (zkPortString == null) {
+                    zkPortString = get(HConstants.ZOOKEEPER_CLIENT_PORT);
+                }
+                if (zkPortString == null) {
+                    zkPort = HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT;
+                } else {
+                    zkPort = Integer.parseInt(zkPortString);
+                }
+            }
+
+            if (zkHosts == null) {
+                zkHosts = get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
+                if (zkHosts == null) {
+                    zkHosts = get(HConstants.ZOOKEEPER_QUORUM);
+                }
+                if (zkHosts == null) {
+                    throw getMalFormedUrlException(
+                        "Quorum not specified and hbase.client.zookeeper.quorum is not set"
+                                + " in configuration : " + url);
+                }
+            } else {
+                zkHosts = zkHosts.replaceAll("=", ":");
+            }
+
+            zkHosts = normalizeHostsList(zkHosts, zkPort);
+            // normalize out zkPort
+            zkPort = null;
+
+            if (zkRootNode == null) {
+                zkRootNode =
+                        get(HConstants.ZOOKEEPER_ZNODE_PARENT,
+                            HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
+            }
+        }
+    }
+
+    public static boolean isZK(Configuration config) {
+        // Default is handled by the caller
+        return config != null
+                && ZK_REGISTRY_NAME.equals(config.get(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY));
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index 8299baaee3..58eb0bdd20 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -48,9 +48,9 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
+import org.apache.phoenix.jdbc.ConnectionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
 import org.apache.phoenix.mapreduce.bulkload.TargetTableRef;
 import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
@@ -199,7 +199,9 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
         if (cmdLine.hasOption(ZK_QUORUM_OPT.getOpt())) {
             // ZK_QUORUM_OPT is optional, but if it's there, use it for both the conn and the job.
             String zkQuorum = cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt());
-            PhoenixDriver.ConnectionInfo info = PhoenixDriver.ConnectionInfo.create(zkQuorum);
+            ConnectionInfo info =
+                    ConnectionInfo.create(PhoenixRuntime.JDBC_PROTOCOL_ZK + ":" + zkQuorum, conf,
+                        null, null);
             LOGGER.info("Configuring HBase connection to {}", info);
             for (Map.Entry<String,String> entry : info.asProps()) {
                 if (LOGGER.isDebugEnabled()) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
index efd602833c..ccd55fd059 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
@@ -25,92 +25,117 @@ import java.util.Properties;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 
-import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
-
 /**
  * Utility class to return a {@link Connection} .
  */
 public class ConnectionUtil {
 
+    private static String TEST_PARAM =
+            PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR + PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
 
     /**
      * Retrieve the configured input Connection.
-     *
      * @param conf configuration containing connection information
      * @return the configured input connection
      */
     public static Connection getInputConnection(final Configuration conf) throws SQLException {
         return getInputConnection(conf, new Properties());
     }
-    
+
     /**
      * Retrieve the configured input Connection.
-     *
      * @param conf configuration containing connection information
      * @param props custom connection properties
      * @return the configured input connection
      */
-    public static Connection getInputConnection(final Configuration conf , final Properties props) throws SQLException {
+    public static Connection getInputConnection(final Configuration conf, final Properties props)
+            throws SQLException {
         Preconditions.checkNotNull(conf);
-		return getConnection(PhoenixConfigurationUtil.getInputCluster(conf),
-				PhoenixConfigurationUtil.getClientPort(conf),
-				PhoenixConfigurationUtil.getZNodeParent(conf),
-				PropertiesUtil.combineProperties(props, conf));
+        String zkQuorumOverride = PhoenixConfigurationUtil.getInputClusterZkQuorum(conf);
+        if (zkQuorumOverride != null) {
+            return DriverManager.getConnection("jdbc:phoenix+zk:" + zkQuorumOverride,
+                PropertiesUtil.combineProperties(props, conf));
+        } else {
+            // FIXME find some better way to get tests working
+            String zkQuorumForTest = PhoenixConfigurationUtil.getZKQuorum(conf);
+            if (zkQuorumForTest != null && (zkQuorumForTest.contains(TEST_PARAM)
+                    || zkQuorumForTest.equals(PhoenixRuntime.CONNECTIONLESS))) {
+                return DriverManager.getConnection("jdbc:phoenix+zk:" + zkQuorumForTest,
+                    PropertiesUtil.combineProperties(props, conf));
+            }
+            return DriverManager.getConnection("jdbc:phoenix",
+                PropertiesUtil.combineProperties(props, conf));
+        }
     }
 
     /**
      * Create the configured output Connection.
-     *
      * @param conf configuration containing the connection information
      * @return the configured output connection
      */
     public static Connection getOutputConnection(final Configuration conf) throws SQLException {
         return getOutputConnection(conf, new Properties());
     }
-    
+
     /**
      * Create the configured output Connection.
-     *
      * @param conf configuration containing the connection information
      * @return the configured output connection
      */
-    public static Connection getOutputConnectionWithoutTheseProps(final Configuration conf, Set<String> ignoreTheseProps) throws SQLException {
+    public static Connection getOutputConnectionWithoutTheseProps(final Configuration conf,
+            Set<String> ignoreTheseProps) throws SQLException {
         return getOutputConnection(conf, new Properties(), ignoreTheseProps);
     }
-    
+
     /**
      * Create the configured output Connection.
-     *
      * @param conf configuration containing the connection information
      * @param props custom connection properties
      * @return the configured output connection
      */
-    public static Connection getOutputConnection(final Configuration conf, Properties props) throws SQLException {
+    public static Connection getOutputConnection(final Configuration conf, Properties props)
+            throws SQLException {
         return getOutputConnection(conf, props, Collections.<String>emptySet());
     }
-    
-    public static Connection getOutputConnection(final Configuration conf, Properties props, Set<String> withoutTheseProps) throws SQLException {
+
+    public static Connection getOutputConnection(final Configuration conf, Properties props,
+            Set<String> withoutTheseProps) throws SQLException {
         Preconditions.checkNotNull(conf);
-		return getConnection(PhoenixConfigurationUtil.getOutputCluster(conf),
-				PhoenixConfigurationUtil.getClientPort(conf),
-				PhoenixConfigurationUtil.getZNodeParent(conf),
-				PropertiesUtil.combineProperties(props, conf, withoutTheseProps));
+        String zkQuorumOverride = PhoenixConfigurationUtil.getOutputClusterZkQuorum(conf);
+        if (zkQuorumOverride != null) {
+            return DriverManager.getConnection("jdbc:phoenix+zk:" + zkQuorumOverride,
+                PropertiesUtil.combineProperties(props, conf));
+        } else {
+            // FIXME find some better way to get tests working
+            String zkQuorumForTest = PhoenixConfigurationUtil.getZKQuorum(conf);
+            if (zkQuorumForTest != null && (zkQuorumForTest.contains(TEST_PARAM)
+                    || zkQuorumForTest.equals(PhoenixRuntime.CONNECTIONLESS))) {
+                return DriverManager.getConnection("jdbc:phoenix:" + zkQuorumForTest,
+                    PropertiesUtil.combineProperties(props, conf));
+            }
+            return DriverManager.getConnection("jdbc:phoenix",
+                PropertiesUtil.combineProperties(props, conf));
+        }
     }
 
     /**
      * Returns the {@link Connection} from a ZooKeeper cluster string.
-     *
      * @param quorum a ZooKeeper quorum connection string
      * @param clientPort a ZooKeeper client port
      * @param znodeParent a zookeeper znode parent
      * @return a Phoenix connection to the given connection string
      */
-    private static Connection getConnection(final String quorum, final Integer clientPort, final String znodeParent, Properties props) throws SQLException {
+    @Deprecated
+    private static Connection getConnection(final String quorum, final Integer clientPort,
+            final String znodeParent, Properties props) throws SQLException {
         Preconditions.checkNotNull(quorum);
-        return DriverManager.getConnection(QueryUtil.getUrl(quorum, clientPort, znodeParent), props);
+        return DriverManager.getConnection(QueryUtil.getUrl(quorum, clientPort, znodeParent),
+            props);
     }
 
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 13c5b70e1d..bb338b9128 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -605,12 +605,15 @@ public final class PhoenixConfigurationUtil {
         Preconditions.checkNotNull(configuration);
         return configuration.get(OUTPUT_TABLE_NAME);
     }
-    
+
     /**
-     * Returns the ZooKeeper quorum string for the HBase cluster a Phoenix MapReduce job will read from
+     * Returns the ZooKeeper quorum string for the HBase cluster a Phoenix MapReduce job will read
+     * from. If MAPREDUCE_OUTPUT_CLUSTER_QUORUM is not set, then it returns the value of
+     * HConstants.ZOOKEEPER_QUORUM
      * @param configuration
      * @return ZooKeeper quorum string
      */
+    @Deprecated
     public static String getInputCluster(final Configuration configuration) {
         Preconditions.checkNotNull(configuration);
         String quorum = configuration.get(MAPREDUCE_INPUT_CLUSTER_QUORUM);
@@ -621,10 +624,35 @@ public final class PhoenixConfigurationUtil {
     }
 
     /**
-     * Returns the ZooKeeper quorum string for the HBase cluster a Phoenix MapReduce job will write to
+     * Returns the ZooKeeper quorum string for the HBase cluster a Phoenix MapReduce job will
+     * read from
+     * @param configuration
+     * @return ZooKeeper quorum string if defined, null otherwise
+     */
+    public static String getInputClusterZkQuorum(final Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return configuration.get(MAPREDUCE_INPUT_CLUSTER_QUORUM);
+    }
+
+    /**
+     * Returns the value of HConstants.ZOOKEEPER_QUORUM.
+     * For tests only
+     * @param configuration
+     * @return ZooKeeper quorum string if defined, null otherwise
+     */
+    public static String getZKQuorum(final Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return configuration.get(HConstants.ZOOKEEPER_QUORUM);
+    }
+
+    /**
+     * Returns the ZooKeeper quorum string for the HBase cluster a Phoenix MapReduce job will write
+     * to. If MAPREDUCE_OUTPUT_CLUSTER_QUORUM is not set, then it returns the value of
+     * HConstants.ZOOKEEPER_QUORUM
      * @param configuration
      * @return ZooKeeper quorum string
      */
+    @Deprecated
     public static String getOutputCluster(final Configuration configuration) {
         Preconditions.checkNotNull(configuration);
         String quorum = configuration.get(MAPREDUCE_OUTPUT_CLUSTER_QUORUM);
@@ -633,12 +661,23 @@ public final class PhoenixConfigurationUtil {
         }
         return quorum;
     }
-    
+
+    /**
+     * Returns the ZooKeeper quorum override MAPREDUCE_OUTPUT_CLUSTER_QUORUM for mapreduce jobs
+     * @param configuration
+     * @return ZooKeeper quorum string if defined, null otherwise
+     */
+    public static String getOutputClusterZkQuorum(final Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return configuration.get(MAPREDUCE_OUTPUT_CLUSTER_QUORUM);
+    }
+
     /**
      * Returns the HBase Client Port
      * @param configuration
      * @return
      */
+    @Deprecated
     public static Integer getClientPort(final Configuration configuration) {
         Preconditions.checkNotNull(configuration);
         String clientPortString = configuration.get(HConstants.ZOOKEEPER_CLIENT_PORT);
@@ -695,6 +734,7 @@ public final class PhoenixConfigurationUtil {
      * @param configuration
      * @return
      */
+    @Deprecated
     public static String getZNodeParent(final Configuration configuration) {
         Preconditions.checkNotNull(configuration);
         return configuration.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index bf161a9499..dcc29f980e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -231,9 +231,9 @@ import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.index.PhoenixTransactionalIndexer;
 import org.apache.phoenix.iterate.TableResultIterator;
 import org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus;
+import org.apache.phoenix.jdbc.ConnectionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.log.ConnectionLimiter;
 import org.apache.phoenix.log.DefaultConnectionLimiter;
 import org.apache.phoenix.log.LoggingConnectionLimiter;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index bd66df3dac..7da182857d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -56,9 +56,9 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.jdbc.ConnectionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.log.QueryLoggerDisruptor;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.PSchema;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index a273403195..d300929ac7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -126,8 +126,11 @@ public interface QueryServices extends SQLCloseable {
     public static final String HBASE_CLIENT_SCANNER_TIMEOUT_ATTRIB = "hbase.client.scanner.timeout.period";
     public static final String RPC_TIMEOUT_ATTRIB = "hbase.rpc.timeout";
     public static final String DYNAMIC_JARS_DIR_KEY = "hbase.dynamic.jars.dir";
+    @Deprecated // Use HConstants directly
     public static final String ZOOKEEPER_QUORUM_ATTRIB = "hbase.zookeeper.quorum";
+    @Deprecated // Use HConstants directly
     public static final String ZOOKEEPER_PORT_ATTRIB = "hbase.zookeeper.property.clientPort";
+    @Deprecated // Use HConstants directly
     public static final String ZOOKEEPER_ROOT_NODE_ATTRIB = "zookeeper.znode.parent";
     public static final String DISTINCT_VALUE_COMPRESS_THRESHOLD_ATTRIB = "phoenix.distinct.value.compress.threshold";
     public static final String SEQUENCE_CACHE_SIZE_ATTRIB = "phoenix.sequence.cacheSize";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java
index 3c034e4b92..9fc001876d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java
@@ -29,7 +29,6 @@ import java.util.Set;
 import java.util.TreeSet;
 
 import org.apache.htrace.Span;
-import org.apache.htrace.Trace;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.metrics.MetricInfo;
 import org.apache.phoenix.query.QueryServices;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java
index 0dc25b6f05..9440da030c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java
@@ -21,10 +21,8 @@ import java.io.IOException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
-import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.htrace.Span;
 import org.apache.htrace.SpanReceiver;
-import org.apache.htrace.impl.MilliSpan;
 import org.apache.phoenix.metrics.MetricInfo;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.slf4j.Logger;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/NotAvailableTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/NotAvailableTransactionProvider.java
index 5d2ef59722..cde98bcde6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/NotAvailableTransactionProvider.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/NotAvailableTransactionProvider.java
@@ -22,8 +22,8 @@ import java.sql.SQLException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.phoenix.jdbc.ConnectionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.transaction.TransactionFactory.Provider;
 
 public class NotAvailableTransactionProvider implements PhoenixTransactionProvider {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
index bbddc1329c..4a34ae4340 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
@@ -32,8 +32,8 @@ import org.apache.phoenix.coprocessor.OmidGCProcessor;
 import org.apache.phoenix.coprocessor.OmidTransactionalProcessor;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.ConnectionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.transaction.TransactionFactory.Provider;
 
 public class OmidTransactionProvider implements PhoenixTransactionProvider {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java
index d730c677a8..dd07c22528 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java
@@ -23,8 +23,8 @@ import java.sql.SQLException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.ConnectionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 
 public interface PhoenixTransactionProvider {
     public enum Feature {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
index 848154f740..6e72fc1940 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
@@ -21,14 +21,14 @@ import static org.apache.phoenix.thirdparty.com.google.common.collect.Maps.newHa
 import static org.apache.phoenix.util.PhoenixRuntime.ANNOTATION_ATTRIB_PREFIX;
 
 import java.sql.SQLException;
-import java.util.Arrays;
 import java.util.Map;
 import java.util.Properties;
-import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
 
 import org.apache.hadoop.hbase.client.Consistency;
+import org.apache.phoenix.jdbc.ConnectionInfo;
+import org.apache.phoenix.jdbc.ZKConnectionInfo;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PName;
@@ -207,24 +207,33 @@ public class JDBCUtil {
     }
 
     /**
-     * Formats a zkUrl which includes the zkQuroum of the jdbc url and the rest to sort the zk quorum hosts.
-     * Example input zkUrl "zk1.net,zk2.net,zk3.net:2181:/hbase"
-     * Example input zkUrl "zk1.net,zk2.net,zk3.net:2181:/hbase:user_foo"
-     * Returns: zk1.net,zk2.net,zk3.net:2181:/hbase
+     * Get the ZK quorom and root and node part of the URL, which is used by the HA code internally
+     * to identify the clusters.
+     * As we interpret a missing protocol as ZK, this is mostly idempotent for zk quorum strings.
+     *
+     * @param jdbcUrl JDBC URL
+     * @return part of the URL determining the ZK quorum and node
+     * @throws RuntimeException if the URL is invalid, or does not resolve to a ZK Registry
+     * connection
      */
-    //TODO: Adjust for non-zkurl
-    public static String formatZookeeperUrl(String zkUrl){
-        String lowerZkUrl = zkUrl.toLowerCase();
-        String[] components = lowerZkUrl.split(String.valueOf(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR));
-        Preconditions.checkArgument(components.length > 0, "Unexpected zk url format.");
-        String[] hosts = components[0].split(",");
-        Preconditions.checkArgument(hosts.length > 0,"Unexpected zk url format no hosts found.");
-        String hostsStrings = Arrays.stream(hosts).sorted().collect(Collectors.joining(","));
-        components[0] = hostsStrings;
-        // host:port:path:principal
-        // additional arguments passed in url, strip them out
-        int endIdx = Integer.min(components.length, 3);
-        return Arrays.stream(components, 0, endIdx).collect(Collectors.joining(":"));
+    public static String formatZookeeperUrl(String jdbcUrl) {
+        ConnectionInfo connInfo;
+        try {
+            connInfo = ConnectionInfo.create(jdbcUrl, null, null);
+            // TODO in theory we could support non-ZK registries for HA.
+            // However, as HA already relies on ZK, this wouldn't be particularly useful,
+            // and would require significant changes.
+            if (!(connInfo instanceof ZKConnectionInfo)) {
+                throw new SQLException("HA connections must use ZooKeeper registry. " + jdbcUrl
+                        + " is not a Zookeeper HBase connection");
+            }
+            ZKConnectionInfo zkInfo = (ZKConnectionInfo) connInfo;
+            StringBuilder sb = new StringBuilder();
+            sb.append(zkInfo.getZkHosts().replaceAll(":", "\\\\:")).append("::")
+                    .append(zkInfo.getZkRootNode());
+            return sb.toString();
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
     }
-
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 6dd9b861d8..04be259c4c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -17,7 +17,10 @@
  */
 package org.apache.phoenix.util;
 
+import static org.apache.phoenix.monitoring.MetricType.NUM_METADATA_LOOKUP_FAILURES;
 import static org.apache.phoenix.schema.types.PDataType.ARRAY_TYPE_SUFFIX;
+import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkNotNull;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -45,22 +48,13 @@ import java.util.TreeSet;
 
 import javax.annotation.Nullable;
 
-import org.apache.phoenix.jdbc.PhoenixMonitoredConnection;
-import org.apache.phoenix.jdbc.PhoenixMonitoredResultSet;
-import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
-import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
-import org.apache.phoenix.thirdparty.org.apache.commons.cli.DefaultParser;
-import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
-import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
-import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
-import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;
-
 import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
@@ -69,13 +63,16 @@ import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixMonitoredConnection;
+import org.apache.phoenix.jdbc.PhoenixMonitoredResultSet;
 import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
-import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.monitoring.GlobalClientMetrics;
 import org.apache.phoenix.monitoring.GlobalMetric;
+import org.apache.phoenix.monitoring.HistogramDistribution;
 import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.monitoring.PhoenixTableMetric;
+import org.apache.phoenix.monitoring.TableMetricsManager;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.AmbiguousColumnException;
@@ -96,20 +93,19 @@ import org.apache.phoenix.schema.RowKeyValueAccessor;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.monitoring.HistogramDistribution;
-import org.apache.phoenix.monitoring.PhoenixTableMetric;
-import org.apache.phoenix.monitoring.TableMetricsManager;
-
 import org.apache.phoenix.thirdparty.com.google.common.base.Function;
 import org.apache.phoenix.thirdparty.com.google.common.base.Joiner;
 import org.apache.phoenix.thirdparty.com.google.common.base.Splitter;
 import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import static org.apache.phoenix.monitoring.MetricType.NUM_METADATA_LOOKUP_FAILURES;
-import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkArgument;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.DefaultParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;
 
 /**
  *
@@ -119,39 +115,77 @@ import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions
  * @since 0.1
  */
 public class PhoenixRuntime {
-    public final static char JDBC_PROTOCOL_TERMINATOR = ';';
-    public final static char JDBC_PROTOCOL_SEPARATOR = ':';
+    //TODO use strings, char needs a lot of error-prone conversions
+    public static final char JDBC_PROTOCOL_TERMINATOR = ';';
+    public static final char JDBC_PROTOCOL_SEPARATOR = ':';
 
     /**
      * JDBC URL jdbc protocol identifier
      */
-    public final static String JDBC_PROTOCOL_IDENTIFIER = "jdbc";
+    public static final String JDBC_PROTOCOL_IDENTIFIER = "jdbc";
 
     /**
-     * JDBC URL phoenix protocol identifier
+     * JDBC URL phoenix protocol identifier (protocol determined from Configuration)
+     */
+    public static final String JDBC_PHOENIX_PROTOCOL_IDENTIFIER = "phoenix";
+
+    /**
+     * JDBC URL phoenix protocol identifier for ZK HBase connection
      */
-    public final static String JDBC_PHOENIX_PROTOCOL_IDENTIFIER = "phoenix";
+    public static final String JDBC_PHOENIX_PROTOCOL_IDENTIFIER_ZK = "phoenix+zk";
+
+    /**
+     * JDBC URL phoenix protocol identifier for the deprecated Master based HBase connection
+     */
+    public static final String JDBC_PHOENIX_PROTOCOL_IDENTIFIER_MASTER = "phoenix+master";
+
+    /**
+     * JDBC URL phoenix protocol identifier for RPC based HBase connection
+     */
+    public static final String JDBC_PHOENIX_PROTOCOL_IDENTIFIER_RPC = "phoenix+rpc";
 
     /**
      * JDBC URL phoenix protocol identifier
      */
-    public final static String JDBC_PHOENIX_THIN_IDENTIFIER = "thin";
+    public static final String JDBC_PHOENIX_THIN_IDENTIFIER = "thin";
+
+    /**
+     * Root for the generic JDBC URL that the Phoenix accepts.
+     */
+    public static final String JDBC_PROTOCOL =
+            JDBC_PROTOCOL_IDENTIFIER + JDBC_PROTOCOL_SEPARATOR + JDBC_PHOENIX_PROTOCOL_IDENTIFIER;
 
     /**
-     * Root for the JDBC URL that the Phoenix accepts accepts.
+     * Root for the explicit ZK JDBC URL that the Phoenix accepts.
      */
-    public final static String JDBC_PROTOCOL = JDBC_PROTOCOL_IDENTIFIER + JDBC_PROTOCOL_SEPARATOR + JDBC_PHOENIX_PROTOCOL_IDENTIFIER;
+    public static final String JDBC_PROTOCOL_ZK =
+            JDBC_PROTOCOL_IDENTIFIER + JDBC_PROTOCOL_SEPARATOR
+                    + JDBC_PHOENIX_PROTOCOL_IDENTIFIER_ZK;
 
     /**
-     * Root for the JDBC URL used by the thin driver. Duplicated here to avoid dependencies
-     * between modules.
+     * Root for the explicit Master (HRPC) JDBC URL that the Phoenix accepts.
      */
-    public final static String JDBC_THIN_PROTOCOL = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + JDBC_PHOENIX_THIN_IDENTIFIER;
+    public static final String JDBC_PROTOCOL_MASTER =
+            JDBC_PROTOCOL_IDENTIFIER + JDBC_PROTOCOL_SEPARATOR
+                    + JDBC_PHOENIX_PROTOCOL_IDENTIFIER_MASTER;
 
+    /**
+     * Root for the explicit Master (HRPC) JDBC URL that the Phoenix accepts.
+     */
+    public static final String JDBC_PROTOCOL_RPC =
+            JDBC_PROTOCOL_IDENTIFIER + JDBC_PROTOCOL_SEPARATOR
+                    + JDBC_PHOENIX_PROTOCOL_IDENTIFIER_RPC;
 
+    /**
+     * Root for the JDBC URL used by the thin driver. Duplicated here to avoid dependencies between
+     * modules.
+     */
+    public static final String JDBC_THIN_PROTOCOL =
+            JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + JDBC_PHOENIX_THIN_IDENTIFIER;
 
     @Deprecated
-    public final static String EMBEDDED_JDBC_PROTOCOL = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+    public static final String EMBEDDED_JDBC_PROTOCOL =
+            PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
     /**
      * Use this connection property to control HBase timestamps
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index 00a8d8dc52..fb2e3434bf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -65,7 +65,9 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import javax.annotation.Nullable;
@@ -74,7 +76,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CompareOperator;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.expression.function.ExternalSqlTypeIdFunction;
 import org.apache.phoenix.expression.function.IndexStateNameFunction;
 import org.apache.phoenix.expression.function.SQLIndexTypeFunction;
@@ -84,8 +85,8 @@ import org.apache.phoenix.expression.function.SqlTypeNameFunction;
 import org.apache.phoenix.expression.function.TransactionProviderNameFunction;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.ConnectionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.parse.TableName;
@@ -98,18 +99,17 @@ import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.tool.SchemaExtractionProcessor;
 import org.apache.phoenix.schema.tool.SchemaProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.thirdparty.com.google.common.base.Function;
 import org.apache.phoenix.thirdparty.com.google.common.base.Joiner;
 import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Iterables;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class QueryUtil {
 
@@ -299,6 +299,7 @@ public final class QueryUtil {
     /**
      * Create the Phoenix JDBC connection URL from the provided cluster connection details.
      */
+    @Deprecated
     public static String getUrl(String zkQuorum) {
         return getUrlInternal(zkQuorum, null, null, null);
     }
@@ -306,6 +307,7 @@ public final class QueryUtil {
     /**
      * Create the Phoenix JDBC connection URL from the provided cluster connection details.
      */
+    @Deprecated
     public static String getUrl(String zkQuorum, int clientPort) {
         return getUrlInternal(zkQuorum, clientPort, null, null);
     }
@@ -313,6 +315,7 @@ public final class QueryUtil {
     /**
      * Create the Phoenix JDBC connection URL from the provided cluster connection details.
      */
+    @Deprecated
     public static String getUrl(String zkQuorum, String znodeParent) {
         return getUrlInternal(zkQuorum, null, znodeParent, null);
     }
@@ -320,6 +323,7 @@ public final class QueryUtil {
     /**
      * Create the Phoenix JDBC connection URL from the provided cluster connection details.
      */
+    @Deprecated
     public static String getUrl(String zkQuorum, int port, String znodeParent, String principal) {
         return getUrlInternal(zkQuorum, port, znodeParent, principal);
     }
@@ -327,6 +331,7 @@ public final class QueryUtil {
     /**
      * Create the Phoenix JDBC connection URL from the provided cluster connection details.
      */
+    @Deprecated
     public static String getUrl(String zkQuorum, int port, String znodeParent) {
         return getUrlInternal(zkQuorum, port, znodeParent, null);
     }
@@ -334,6 +339,7 @@ public final class QueryUtil {
     /**
      * Create the Phoenix JDBC connection URL from the provided cluster connection details.
      */
+    @Deprecated
     public static String getUrl(String zkQuorum, Integer port, String znodeParent) {
         return getUrlInternal(zkQuorum, port, znodeParent, null);
     }
@@ -341,13 +347,15 @@ public final class QueryUtil {
     /**
      * Create the Phoenix JDBC connection URL from the provided cluster connection details.
      */
+    @Deprecated
     public static String getUrl(String zkQuorum, Integer port, String znodeParent, String principal) {
         return getUrlInternal(zkQuorum, port, znodeParent, principal);
     }
 
+    @Deprecated
     private static String getUrlInternal(String zkQuorum, Integer port, String znodeParent, String principal) {
-        return new PhoenixEmbeddedDriver.ConnectionInfo(zkQuorum, port, znodeParent, principal, null).toUrl()
-                + PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+        return String.join(":", PhoenixRuntime.JDBC_PROTOCOL, zkQuorum, port == null ? "" : port.toString(), znodeParent == null ? "" : znodeParent, principal == null ? "" : principal)
+                + Character.toString(PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR);
     }
 
     public static String getExplainPlan(ResultSet rs) throws SQLException {
@@ -435,12 +443,18 @@ public final class QueryUtil {
      */
     public static String getConnectionUrl(Properties props, Configuration conf, String principal)
             throws SQLException {
-        // read the hbase properties from the configuration
-        int port = getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT, props, conf);
-        // Build the ZK quorum server string with "server:clientport" list, separated by ','
-        final String server = getString(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST, props, conf);
-        String znodeParent = getString(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT, props, conf);
-        String url = getUrl(server, port, znodeParent, principal);
+        ReadOnlyProps propsWithPrincipal;
+        if (principal != null) {
+            Map<String, String> principalProp = new HashMap<>();
+            principalProp.put(QueryServices.HBASE_CLIENT_PRINCIPAL, principal);
+            propsWithPrincipal = new ReadOnlyProps(principalProp.entrySet().iterator());
+        } else {
+            propsWithPrincipal = ReadOnlyProps.EMPTY_PROPS;
+        }
+        ConnectionInfo info =
+                ConnectionInfo.createNoLogin(PhoenixRuntime.JDBC_PROTOCOL, conf, propsWithPrincipal,
+                    props);
+        String url = info.toUrl();
         if (url.endsWith(PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR + "")) {
             url = url.substring(0, url.length() - 1);
         }
@@ -462,7 +476,7 @@ public final class QueryUtil {
         }
         return url;
     }
-    
+
     private static int getInt(String key, int defaultValue, Properties props, Configuration conf) {
         if (conf == null) {
             Preconditions.checkNotNull(props);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java
index 9f4229777d..3c2001205d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java
@@ -42,8 +42,8 @@ import org.slf4j.LoggerFactory;
  */
 public class ClusterRoleRecordTest {
     private static final Logger LOG = LoggerFactory.getLogger(ClusterRoleRecordTest.class);
-    private static final String ZK1 = "zk1-1,zk1-2:2181:/hbase";
-    private static final String ZK2 = "zk2-1,zk2-2:2181:/hbase";
+    private static final String ZK1 = "zk1-1\\:2181,zk1-2\\:2181::/hbase";
+    private static final String ZK2 = "zk2-1\\:2181,zk2-2\\:2181::/hbase";
 
     @Rule
     public final TestName testName = new TestName();
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFailureTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFailureTest.java
index ed74645d70..c64252d530 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFailureTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFailureTest.java
@@ -46,6 +46,8 @@ public class ParallelPhoenixConnectionFailureTest extends BaseTest {
     private static String url =
             JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + PhoenixRuntime.CONNECTIONLESS;
 
+    private static int WAIT_MS = 30000;
+
     @Test
     public void testExecuteQueryChainFailure() throws SQLException {
         HBaseTestingUtility hbaseTestingUtility = new HBaseTestingUtility();
@@ -77,16 +79,16 @@ public class ParallelPhoenixConnectionFailureTest extends BaseTest {
         parallelConn.createStatement().execute("SELECT * FROM SYSTEM.CATALOG");
         parallelConn.createStatement().execute("SELECT * FROM SYSTEM.CATALOG");
         // Verify successful execution on both connections
-        hbaseTestingUtility.waitFor(10000, () -> (numStatementsCreatedOnConn1.get() == 2)
+        hbaseTestingUtility.waitFor(WAIT_MS, () -> (numStatementsCreatedOnConn1.get() == 2)
                 && (numStatementsCreatedOnConn2.get() == 2));
         // Error on conn1, we shouldn't use conn1 after that
         doThrow(new SQLException()).when(connSpy1).createStatement();
         parallelConn.createStatement().execute("SELECT * FROM SYSTEM.CATALOG");
-        hbaseTestingUtility.waitFor(10000, () -> numStatementsCreatedOnConn2.get() == 3);
+        hbaseTestingUtility.waitFor(WAIT_MS, () -> numStatementsCreatedOnConn2.get() == 3);
         doAnswer(answer1).when(connSpy1).createStatement();
         // Should still have a successful execution only from conn2 since conn1 errored before
         parallelConn.createStatement().execute("SELECT * FROM SYSTEM.CATALOG");
-        hbaseTestingUtility.waitFor(10000, () -> (numStatementsCreatedOnConn1.get() == 2)
+        hbaseTestingUtility.waitFor(WAIT_MS, () -> (numStatementsCreatedOnConn1.get() == 2)
                 && (numStatementsCreatedOnConn2.get() == 4));
         // Any task that we chain on conn1 should error out
         assertTrue(context.chainOnConn1(() -> Boolean.TRUE).isCompletedExceptionally());
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java
index 1cc3f3f346..fe439d7462 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java
@@ -20,7 +20,10 @@ package org.apache.phoenix.jdbc;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 
 import java.sql.Driver;
 import java.sql.DriverManager;
@@ -28,121 +31,324 @@ import java.sql.SQLException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.query.HBaseFactoryProvider;
 import org.junit.Test;
 
 public class PhoenixEmbeddedDriverTest {
+
     @Test
-    public void testGetConnectionInfo() throws SQLException {
+    public void testGetZKConnectionInfo() throws SQLException {
+        Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        String defaultQuorum = config.get(HConstants.ZOOKEEPER_QUORUM);
+
+        for (String protocol : new String[] { "phoenix", "phoenix+zk" }) {
+            String[] urls =
+                    new String[] { null,
+                            "",
+                            "jdbc:" + protocol + "",
+                            "jdbc:" + protocol + ";test=true",
+                            "jdbc:" + protocol + ":localhost",
+                            "localhost",
+                            "localhost;",
+                            "jdbc:" + protocol + ":localhost:123",
+                            "jdbc:" + protocol + ":localhost:123;foo=bar",
+                            "localhost:123",
+                            "jdbc:" + protocol + ":localhost:123:/hbase",
+                            "jdbc:" + protocol + ":localhost:123:/foo-bar",
+                            "jdbc:" + protocol + ":localhost:123:/foo-bar;foo=bas",
+                            "localhost:123:/foo-bar",
+                            "jdbc:" + protocol + ":localhost:/hbase",
+                            "jdbc:" + protocol + ":localhost:/foo-bar",
+                            "jdbc:" + protocol + ":localhost:/foo-bar;test=true",
+                            "localhost:/foo-bar",
+                            "jdbc:" + protocol + ":v1,v2,v3",
+                            "jdbc:" + protocol + ":v1,v2,v3;",
+                            "jdbc:" + protocol + ":v1,v2,v3;test=true",
+                            "v1,v2,v3",
+                            "jdbc:" + protocol + ":v1,v2,v3:/hbase",
+                            "jdbc:" + protocol + ":v1,v2,v3:/hbase;test=true",
+                            "v1,v2,v3:/foo-bar",
+                            "jdbc:" + protocol + ":v1,v2,v3:123:/hbase",
+                            "v1,v2,v3:123:/hbase",
+                            "jdbc:" + protocol + ":v1,v2,v3:123:/hbase;test=false",
+                            "jdbc:" + protocol + ":v1,v2,v3:123:/hbase:user/principal:/user.keytab;test=false",
+                            "jdbc:" + protocol + ":v1,v2,v3:123:/foo-bar:user/principal:/user.keytab;test=false",
+                            "jdbc:" + protocol + ":v1,v2,v3:123:user/principal:/user.keytab;test=false",
+                            "jdbc:" + protocol + ":v1,v2,v3:user/principal:/user.keytab;test=false",
+                            "jdbc:" + protocol + ":v1,v2,v3:/hbase:user/principal:/user.keytab;test=false",
+                            "jdbc:" + protocol + ":v1,v2,v3:LongRunningQueries;test=false",
+                            "jdbc:" + protocol + ":v1,v2,v3:345:LongRunningQueries;test=false",
+                            "jdbc:" + protocol + ":localhost:1234:user:C:\\user.keytab",
+                            "jdbc:" + protocol + ":v1,v2,v3:345:/hbase:user1:C:\\Documents and Settings\\user1\\user1.keytab;test=false", };
+            String[][] partsList =
+                    new String[][] { { defaultQuorum + ":2181", null, "/hbase" },
+                            { defaultQuorum + ":2181", null, "/hbase" },
+                            { defaultQuorum + ":2181", null, "/hbase" }, {},
+                            { "localhost:2181", null, "/hbase" },
+                            { "localhost:2181", null, "/hbase" },
+                            { "localhost:2181", null, "/hbase" },
+                            { "localhost:123", null, "/hbase" },
+                            { "localhost:123", null, "/hbase" },
+                            { "localhost:123", null, "/hbase" },
+                            { "localhost:123", null, "/hbase" },
+                            { "localhost:123", null, "/foo-bar" },
+                            { "localhost:123", null, "/foo-bar" },
+                            { "localhost:123", null, "/foo-bar" },
+                            { "localhost:2181", null, "/hbase" },
+                            { "localhost:2181", null, "/foo-bar" },
+                            { "localhost:2181", null, "/foo-bar" },
+                            { "localhost:2181", null, "/foo-bar" },
+                            { "v1:2181,v2:2181,v3:2181", null, "/hbase" },
+                            { "v1:2181,v2:2181,v3:2181", null, "/hbase" },
+                            { "v1:2181,v2:2181,v3:2181", null, "/hbase" },
+                            { "v1:2181,v2:2181,v3:2181", null, "/hbase" },
+                            { "v1:2181,v2:2181,v3:2181", null, "/hbase" },
+                            { "v1:2181,v2:2181,v3:2181", null, "/hbase" },
+                            { "v1:2181,v2:2181,v3:2181", null, "/foo-bar" },
+                            { "v1:123,v2:123,v3:123", null, "/hbase" },
+                            { "v1:123,v2:123,v3:123", null, "/hbase" },
+                            { "v1:123,v2:123,v3:123", null, "/hbase" },
+                            { "v1:123,v2:123,v3:123", null, "/hbase", "user/principal",
+                                    "/user.keytab" },
+                            { "v1:123,v2:123,v3:123", null, "/foo-bar", "user/principal",
+                                    "/user.keytab" },
+                            { "v1:123,v2:123,v3:123", null, "/hbase", "user/principal",
+                                    "/user.keytab" },
+                            { "v1:2181,v2:2181,v3:2181", null, "/hbase", "user/principal",
+                                    "/user.keytab" },
+                            { "v1:2181,v2:2181,v3:2181", null, "/hbase", "user/principal",
+                                    "/user.keytab" },
+                            { "v1:2181,v2:2181,v3:2181", null, "/hbase", "LongRunningQueries" },
+                            { "v1:345,v2:345,v3:345", null, "/hbase", "LongRunningQueries" },
+                            { "localhost:1234", null, "/hbase", "user", "C:\\user.keytab" },
+                            { "v1:345,v2:345,v3:345", null, "/hbase", "user1",
+                                    "C:\\Documents and Settings\\user1\\user1.keytab" }, };
+            assertEquals(urls.length, partsList.length);
+            for (int i = 0; i < urls.length; i++) {
+                int pos = 0;
+                try {
+                    ZKConnectionInfo info =
+                            (ZKConnectionInfo) ConnectionInfo.create(urls[i], null, null);
+                    String[] parts = partsList[i];
+                    if (parts.length > pos) {
+                        assertEquals(parts[pos], info.getZkHosts());
+                    }
+                    if (parts.length > ++pos) {
+                        assertEquals(parts[pos], info.getZkPort());
+                    }
+                    if (parts.length > ++pos) {
+                        assertEquals(parts[pos], info.getZkRootNode());
+                    }
+                    if (parts.length > ++pos) {
+                        assertEquals(parts[pos], info.getPrincipal());
+                    }
+                    if (parts.length > ++pos) {
+                        assertEquals(parts[pos], info.getKeytab());
+                    }
+                } catch (AssertionError e) {
+                    throw new AssertionError(
+                            "For \"" + urls[i] + " at position: " + pos + "\": " + e.getMessage());
+                }
+            }
+        }
+
+    }
+
+    @Test
+    public void testGetMasterConnectionInfo() throws SQLException {
+        assumeTrue(VersionInfo.compareVersion(VersionInfo.getVersion(), "2.3.0")>=0);
         Configuration config =
                 HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
-        String defaultQuorum = config.get(HConstants.ZOOKEEPER_QUORUM);
+        config.set("hbase.client.registry.impl", "org.apache.hadoop.hbase.client.MasterRegistry");
+        String defaultMasters = "defaultmaster1:1243,defaultmaster2:2345";
+        config.set("hbase.masters", defaultMasters);
 
         String[] urls = new String[] {
             null,
             "",
+            "jdbc:phoenix+master",
+            "jdbc:phoenix+master;test=true",
             "jdbc:phoenix",
-            "jdbc:phoenix;test=true",
-            "jdbc:phoenix:localhost",
+            "jdbc:phoenix+master:localhost",
             "localhost",
             "localhost;",
-            "jdbc:phoenix:localhost:123",
-            "jdbc:phoenix:localhost:123;foo=bar",
             "localhost:123",
-            "jdbc:phoenix:localhost:123:/hbase",
-            "jdbc:phoenix:localhost:123:/foo-bar",
-            "jdbc:phoenix:localhost:123:/foo-bar;foo=bas",
-            "localhost:123:/foo-bar",
-            "jdbc:phoenix:localhost:/hbase",
-            "jdbc:phoenix:localhost:/foo-bar",
-            "jdbc:phoenix:localhost:/foo-bar;test=true",
-            "localhost:/foo-bar",
-            "jdbc:phoenix:v1,v2,v3",
-            "jdbc:phoenix:v1,v2,v3;",
-            "jdbc:phoenix:v1,v2,v3;test=true",
-            "v1,v2,v3",
-            "jdbc:phoenix:v1,v2,v3:/hbase",
-            "jdbc:phoenix:v1,v2,v3:/hbase;test=true",
-            "v1,v2,v3:/foo-bar",
-            "jdbc:phoenix:v1,v2,v3:123:/hbase",
-            "v1,v2,v3:123:/hbase",
-            "jdbc:phoenix:v1,v2,v3:123:/hbase;test=false",
-            "jdbc:phoenix:v1,v2,v3:123:/hbase:user/principal:/user.keytab;test=false",
-            "jdbc:phoenix:v1,v2,v3:123:/foo-bar:user/principal:/user.keytab;test=false",
-            "jdbc:phoenix:v1,v2,v3:123:user/principal:/user.keytab;test=false",
-            "jdbc:phoenix:v1,v2,v3:user/principal:/user.keytab;test=false",
-            "jdbc:phoenix:v1,v2,v3:/hbase:user/principal:/user.keytab;test=false",
-            "jdbc:phoenix:v1,v2,v3:LongRunningQueries;test=false",
-            "jdbc:phoenix:v1,v2,v3:345:LongRunningQueries;test=false",
-            "jdbc:phoenix:localhost:1234:user:C:\\user.keytab",
-            "jdbc:phoenix:v1,v2,v3:345:/hbase:user1:C:\\Documents and Settings\\user1\\user1.keytab;test=false",
+            "localhost,localhost2:123;",
+            "localhost\\:123",
+            "localhost\\:123:",
+            "localhost\\:123::",
+            "localhost\\:123:::",
+            "localhost\\:123::::",
+            "localhost\\:123:::::",
+            "localhost\\:123:345::::",
+            "localhost,localhost2\\:123;",
+            "localhost,localhost2\\:123:456",
+            "localhost,localhost2\\:123:456;test=false",
+            "localhost\\:123:::user/principal:/user.keytab",
+            "localhost\\:123:::LongRunningQueries",
+            "localhost\\:123:::LongRunningQueries:",
+            "localhost\\:123:::LongRunningQueries::",
+            "localhost\\:123:::user/principal:C:\\user.keytab",
+            "localhost\\:123:::user/principal:C:\\Documents and Settings\\user1\\user1.keytab",
         };
-        ConnectionInfo[] infos = new ConnectionInfo[] {
-            new ConnectionInfo(defaultQuorum, 2181, "/hbase"),
-            new ConnectionInfo(defaultQuorum, 2181, "/hbase"),
-            new ConnectionInfo(defaultQuorum, 2181, "/hbase"),
-            new ConnectionInfo(null,null,null),
-            new ConnectionInfo("localhost",null,null),
-            new ConnectionInfo("localhost",null,null),
-            new ConnectionInfo("localhost",null,null),
-            new ConnectionInfo("localhost",123,null),
-            new ConnectionInfo("localhost",123,null),
-            new ConnectionInfo("localhost",123,null),
-            new ConnectionInfo("localhost",123,"/hbase"),
-            new ConnectionInfo("localhost",123,"/foo-bar"),
-            new ConnectionInfo("localhost",123,"/foo-bar"),
-            new ConnectionInfo("localhost",123,"/foo-bar"),
-            new ConnectionInfo("localhost",null,"/hbase"),
-            new ConnectionInfo("localhost",null,"/foo-bar"),
-            new ConnectionInfo("localhost",null,"/foo-bar"),
-            new ConnectionInfo("localhost",null,"/foo-bar"),
-            new ConnectionInfo("v1,v2,v3",null,null),
-            new ConnectionInfo("v1,v2,v3",null,null),
-            new ConnectionInfo("v1,v2,v3",null,null),
-            new ConnectionInfo("v1,v2,v3",null,null),
-            new ConnectionInfo("v1,v2,v3",null,"/hbase"),
-            new ConnectionInfo("v1,v2,v3",null,"/hbase"),
-            new ConnectionInfo("v1,v2,v3",null,"/foo-bar"),
-            new ConnectionInfo("v1,v2,v3",123,"/hbase"),
-            new ConnectionInfo("v1,v2,v3",123,"/hbase"),
-            new ConnectionInfo("v1,v2,v3",123,"/hbase"),
-            new ConnectionInfo("v1,v2,v3",123,"/hbase","user/principal", "/user.keytab" ),
-            new ConnectionInfo("v1,v2,v3",123,"/foo-bar","user/principal", "/user.keytab" ),
-            new ConnectionInfo("v1,v2,v3",123, null,"user/principal", "/user.keytab" ),
-            new ConnectionInfo("v1,v2,v3", null, null,"user/principal", "/user.keytab" ),
-            new ConnectionInfo("v1,v2,v3",null,"/hbase","user/principal", "/user.keytab" ),
-            new ConnectionInfo("v1,v2,v3",null,null,"LongRunningQueries", null ),
-            new ConnectionInfo("v1,v2,v3",345,null,"LongRunningQueries", null ),
-            new ConnectionInfo("localhost", 1234, null, "user", "C:\\user.keytab"),
-            new ConnectionInfo("v1,v2,v3", 345, "/hbase", "user1", "C:\\Documents and Settings\\user1\\user1.keytab"),
+        String[][] partsList = new String[][] {
+            {defaultMasters},
+            {defaultMasters},
+            {defaultMasters},
+            {defaultMasters},
+            {defaultMasters},
+            {"localhost:"+HConstants.DEFAULT_MASTER_PORT},
+            {"localhost:"+HConstants.DEFAULT_MASTER_PORT},
+            {"localhost:"+HConstants.DEFAULT_MASTER_PORT},
+            {"localhost:123"},
+            {"localhost2:123,localhost:123"},
+            {"localhost:123"},
+            {"localhost:123"},
+            {"localhost:123"},
+            {"localhost:123"},
+            {"localhost:123"},
+            {"localhost:123"},
+            {"localhost:123"},
+            {"localhost2:123,localhost:16000"},
+            {"localhost2:123,localhost:456"},
+            {"localhost2:123,localhost:456"},
+            {"localhost:123","user/principal","/user.keytab"},
+            {"localhost:123","LongRunningQueries",null},
+            {"localhost:123","LongRunningQueries",null},
+            {"localhost:123","LongRunningQueries",null},
+            {"localhost:123","user/principal","C:\\user.keytab"},
+            {"localhost:123","user/principal","C:\\Documents and Settings\\user1\\user1.keytab"},
         };
-        assertEquals(urls.length,infos.length);
+        assertEquals(urls.length,partsList.length);
         for (int i = 0; i < urls.length; i++) {
             try {
-                ConnectionInfo info = ConnectionInfo.create(urls[i]);
-                assertEquals(infos[i], info);
+                Configuration testConfig = new Configuration(config);
+                MasterConnectionInfo info = (MasterConnectionInfo)ConnectionInfo.create(urls[i], testConfig, null, null);
+                String[] parts = partsList[i];
+                assertEquals(parts[0], info.getBoostrapServers());
+                if(parts.length>1) {
+                    assertEquals(parts[1], info.getPrincipal());
+                } else {
+                    assertNull(info.getPrincipal());
+                }
+                if(parts.length>2) {
+                    assertEquals(parts[2], info.getKeytab());
+                } else {
+                    assertNull(info.getKeytab());
+                }
             } catch (AssertionError e) {
-                throw new AssertionError("For \"" + urls[i] + "\": " + e.getMessage());
+                throw new AssertionError("For \"" + urls[i] + ": " + e.getMessage());
             }
         }
     }
+
+    @Test
+    public void testGetRPCConnectionInfo() throws SQLException {
+        assumeTrue(VersionInfo.compareVersion(VersionInfo.getVersion(), "2.5.0")>=0);
+        Configuration config =
+                HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        config.set("hbase.client.registry.impl", "org.apache.hadoop.hbase.client.RpcConnectionRegistry");
+        String defaultBoostraps = "defaultmaster1:1243,defaultmaster2:2345";
+        config.set("hbase.client.bootstrap.servers", defaultBoostraps);
+
+        String[] urls = new String[] {
+            null,
+            "",
+            "jdbc:phoenix+rpc",
+            "jdbc:phoenix+rpc\";test=true",
+            "jdbc:phoenix",
+            "jdbc:phoenix+rpc\":localhost",
+            "localhost",
+            "localhost;",
+            "localhost:123",
+            "localhost,localhost2:123;",
+            "localhost\\:123",
+            "localhost\\:123:",
+            "localhost\\:123::",
+            "localhost\\:123:::",
+            "localhost\\:123::::",
+            "localhost\\:123:::::",
+            "localhost\\:123:345::::",
+            "localhost,localhost2\\:123;",
+            "localhost,localhost2\\:123:456",
+            "localhost,localhost2\\:123:456;test=false",
+            "localhost\\:123:::user/principal:/user.keytab",
+            "localhost\\:123:::LongRunningQueries",
+            "localhost\\:123:::LongRunningQueries:",
+            "localhost\\:123:::LongRunningQueries::",
+            "localhost\\:123:::user/principal:C:\\user.keytab",
+            "localhost\\:123:::user/principal:C:\\Documents and Settings\\user1\\user1.keytab",
+        };
+        String[][] partsList = new String[][] {
+            {defaultBoostraps},
+            {defaultBoostraps},
+            {defaultBoostraps},
+            {defaultBoostraps},
+            {defaultBoostraps},
+            {"localhost"},
+            {"localhost"},
+            {"localhost"},
+            {"localhost:123"},
+            {"localhost2:123,localhost:123"},
+            {"localhost:123"},
+            {"localhost:123"},
+            {"localhost:123"},
+            {"localhost:123"},
+            {"localhost:123"},
+            {"localhost:123"},
+            {"localhost:123"},
+            //No default port
+            {"localhost,localhost2:123"},
+            {"localhost2:123,localhost:456"},
+            {"localhost2:123,localhost:456"},
+            {"localhost:123","user/principal","/user.keytab"},
+            {"localhost:123","LongRunningQueries",null},
+            {"localhost:123","LongRunningQueries",null},
+            {"localhost:123","LongRunningQueries",null},
+            {"localhost:123","user/principal","C:\\user.keytab"},
+            {"localhost:123","user/principal","C:\\Documents and Settings\\user1\\user1.keytab"},
+        };
+        assertEquals(urls.length,partsList.length);
+        for (int i = 0; i < urls.length; i++) {
+            try {
+                Configuration testConfig = new Configuration(config);
+                RPCConnectionInfo info = (RPCConnectionInfo)ConnectionInfo.create(urls[i], testConfig, null, null);
+                String[] parts = partsList[i];
+                assertEquals(parts[0], info.getBoostrapServers());
+                if(parts.length>1) {
+                    assertEquals(parts[1], info.getPrincipal());
+                } else {
+                    assertNull(info.getPrincipal());
+                }
+                if(parts.length>2) {
+                    assertEquals(parts[2], info.getKeytab());
+                } else {
+                    assertNull(info.getKeytab());
+                }
+            } catch (AssertionError e) {
+                throw new AssertionError("For \"" + urls[i] + ": " + e.getMessage());
+            }
+        }
+    }
+
     @Test
     public void testNegativeGetConnectionInfo() throws SQLException {
         String[] urls = new String[] {
-            "jdbc:phoenix::",
-            "jdbc:phoenix:;",
+            //Reject unescaped ports in quorum string
             "jdbc:phoenix:v1:1,v2:2,v3:3",
             "jdbc:phoenix:v1:1,v2:2,v3:3;test=true",
             "jdbc:phoenix:v1,v2,v3:-1:/hbase;test=true",
             "jdbc:phoenix:v1,v2,v3:-1",
-            "jdbc:phoenix:v1,v2,v3:123::/hbase",
-            "jdbc:phoenix:v1,v2,v3:123::/hbase;test=false",
+            "jdbc:phoenix+zk:v1:1,v2:2,v3:3",
+            "jdbc:phoenix+zk:v1:1,v2:2,v3:3;test=true",
+            "jdbc:phoenix+zk:v1,v2,v3:-1:/hbase;test=true",
+            "jdbc:phoenix+zk:v1,v2,v3:-1"
         };
         for (String url : urls) {
             try {
-                ConnectionInfo.create(url);
+                ConnectionInfo.create(url, null, null);
                 throw new AssertionError("Expected exception for \"" + url + "\"");
             } catch (SQLException e) {
                 try {
@@ -153,7 +359,171 @@ public class PhoenixEmbeddedDriverTest {
             }
         }
     }
-    
+
+    @Test
+    public void testRPCNegativeGetConnectionInfo() throws SQLException {
+        assumeTrue(VersionInfo.compareVersion(VersionInfo.getVersion(), "2.5.0")>=0);
+        String[] urls = new String[] {
+            //Reject unescaped and invalid ports in quorum string
+            "jdbc:phoenix+rpc:v1:1,v2:2,v3:3",
+            "jdbc:phoenix+rpc:v1:1,v2:2,v3:3;test=true",
+            "jdbc:phoenix+rpc:v1,v2,v3:-1:/hbase;test=true",
+            "jdbc:phoenix+rpc:v1,v2,v3:-1",
+            "jdbc:phoenix+master:v1:1,v2:2,v3:3",
+            "jdbc:phoenix+master:v1:1,v2:2,v3:3;test=true",
+            "jdbc:phoenix+master:v1,v2,v3:-1:/hbase;test=true",
+            "jdbc:phoenix+master:v1,v2,v3:-1",
+            //Reject rootnode and missing empty rootnode field
+            "jdbc:phoenix+rpc:localhost,localhost2\\:123:456:rootNode",
+            "jdbc:phoenix+rpc:localhost,localhost2\\:123:456:rootNode:prinicpial:keystore",
+            "jdbc:phoenix+rpc:localhost,localhost2\\:123:456:prinicpial",
+            "jdbc:phoenix+rpc:localhost,localhost2\\:123:456:prinicpial:keystore",
+            "jdbc:phoenix+master:localhost,localhost2\\:123:456:rootNode",
+            "jdbc:phoenix+master:localhost,localhost2\\:123:456:rootNode:prinicpial:keystore",
+            "jdbc:phoenix+master:localhost,localhost2\\:123:456:prinicpial",
+            "jdbc:phoenix+master:localhost,localhost2\\:123:456:prinicpial:keystore",
+
+        };
+        for (String url : urls) {
+            try {
+                ConnectionInfo.create(url, null, null);
+                throw new AssertionError("Expected exception for \"" + url + "\"");
+            } catch (SQLException e) {
+                try {
+                    assertEquals(SQLExceptionCode.MALFORMED_CONNECTION_URL.getSQLState(), e.getSQLState());
+                } catch (AssertionError ae) {
+                    throw new AssertionError("For \"" + url + "\": " + ae.getMessage());
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testMasterDefaults() throws SQLException {
+        assumeTrue(VersionInfo.compareVersion(VersionInfo.getVersion(), "2.3.0") >= 0);
+        try {
+            Configuration config =
+                    HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+            config.set("hbase.client.registry.impl",
+                "org.apache.hadoop.hbase.client.MasterRegistry");
+            ConnectionInfo.create("jdbc:phoenix+master", config, null, null);
+            fail("Should have thrown exception");
+        } catch (SQLException e) {
+        }
+
+        Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        config.set("hbase.client.registry.impl", "org.apache.hadoop.hbase.client.MasterRegistry");
+        config.set("hbase.master.hostname", "master.hostname");
+        MasterConnectionInfo info =
+                (MasterConnectionInfo) ConnectionInfo.create("jdbc:phoenix+master", config, null,
+                    null);
+        assertEquals(info.getBoostrapServers(), "master.hostname:16000");
+
+        config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        config.set("hbase.client.registry.impl", "org.apache.hadoop.hbase.client.MasterRegistry");
+        config.set("hbase.master.hostname", "master.hostname");
+        config.set("hbase.master.port", "17000");
+        info = (MasterConnectionInfo) ConnectionInfo.create("jdbc:phoenix", config, null, null);
+        assertEquals(info.getBoostrapServers(), "master.hostname:17000");
+
+        config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        config.set("hbase.client.registry.impl", "org.apache.hadoop.hbase.client.MasterRegistry");
+        config.set("hbase.master.hostname", "master.hostname");
+        config.set("hbase.master.port", "17000");
+        config.set("hbase.masters", "master1:123,master2:234,master3:345");
+        info = (MasterConnectionInfo) ConnectionInfo.create("jdbc:phoenix", config, null, null);
+        assertEquals(info.getBoostrapServers(), "master1:123,master2:234,master3:345");
+
+        config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        config.set("hbase.client.registry.impl", "org.apache.hadoop.hbase.client.MasterRegistry");
+        config.set("hbase.master.port", "17000");
+        info =
+                (MasterConnectionInfo) ConnectionInfo.create(
+                    "jdbc:phoenix+master:master1.from.url,master2.from.url", config, null, null);
+        assertEquals(info.getBoostrapServers(), "master1.from.url:17000,master2.from.url:17000");
+
+        config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        config.set("hbase.client.registry.impl", "org.apache.hadoop.hbase.client.MasterRegistry");
+        config.set("hbase.master.port", "17000");
+        info =
+                (MasterConnectionInfo) ConnectionInfo.create(
+                    "jdbc:phoenix+master:master1.from.url\\:123,master2.from.url", config, null,
+                    null);
+        assertEquals(info.getBoostrapServers(), "master1.from.url:123,master2.from.url:17000");
+
+        config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        config.set("hbase.client.registry.impl", "org.apache.hadoop.hbase.client.MasterRegistry");
+        config.set("hbase.master.hostname", "master.hostname");
+        config.set("hbase.master.port", "17000");
+        config.set("hbase.masters", "master1:123,master2:234,master3:345");
+        info =
+                (MasterConnectionInfo) ConnectionInfo.create(
+                    "jdbc:phoenix:master1.from.url\\:123,master2.from.url:18000", config, null,
+                    null);
+        assertEquals(info.getBoostrapServers(), "master1.from.url:123,master2.from.url:18000");
+
+        config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        config.set("hbase.client.registry.impl", "org.apache.hadoop.hbase.client.MasterRegistry");
+        config.set("hbase.master.hostname", "master.hostname");
+        config.set("hbase.master.port", "17000");
+        config.set("hbase.masters", "master1:123,master2:234,master3:345");
+        info =
+                (MasterConnectionInfo) ConnectionInfo.create(
+                    "jdbc:phoenix:master1.from.url\\:123,master2.from.url\\:234:18000", config,
+                    null, null);
+        assertEquals(info.getBoostrapServers(), "master1.from.url:123,master2.from.url:234");
+    }
+
+    @Test
+    public void testRPCDefaults() throws SQLException {
+        assumeTrue(VersionInfo.compareVersion(VersionInfo.getVersion(), "2.5.0") >= 0);
+        try {
+            Configuration config =
+                    HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+            config.set("hbase.client.registry.impl",
+                "org.apache.hadoop.hbase.client.RpcConnectionRegistry");
+            ConnectionInfo.create("jdbc:phoenix+rpc", config, null, null);
+            fail("Should have thrown exception");
+        } catch (SQLException e) {
+        }
+
+        Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        config.set("hbase.client.registry.impl",
+            "org.apache.hadoop.hbase.client.RpcConnectionRegistry");
+        config.set("hbase.client.bootstrap.servers", "bootstrap1\\:123,boostrap2\\:234");
+        RPCConnectionInfo info =
+                (RPCConnectionInfo) ConnectionInfo.create("jdbc:phoenix+rpc", config, null, null);
+        assertEquals(info.getBoostrapServers(), "bootstrap1\\:123,boostrap2\\:234");
+
+        config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        config.set("hbase.client.registry.impl",
+            "org.apache.hadoop.hbase.client.RpcConnectionRegistry");
+        info =
+                (RPCConnectionInfo) ConnectionInfo.create(
+                    "jdbc:phoenix+rpc:bootstrap1.from.url,bootstrap2.from.url", config, null, null);
+        // TODO looks like HBase doesn't do port replacement/check for RPC servers either ?
+        assertEquals(info.getBoostrapServers(), "bootstrap1.from.url,bootstrap2.from.url");
+
+        config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        config.set("hbase.client.registry.impl",
+            "org.apache.hadoop.hbase.client.RpcConnectionRegistry");
+        info =
+                (RPCConnectionInfo) ConnectionInfo.create(
+                    "jdbc:phoenix+rpc:bootstrap1.from.url\\:123,bootstrap2.from.url\\::234", config,
+                    null, null);
+        // TODO looks like HBase doesn't do port replacement/check for RPC servers either ?
+        assertEquals(info.getBoostrapServers(), "bootstrap1.from.url:123,bootstrap2.from.url:234");
+
+        // Check fallback to master properties
+        config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        config.set("hbase.client.registry.impl",
+            "org.apache.hadoop.hbase.client.RpcConnectionRegistry");
+        config.set("hbase.masters", "master1:123,master2:234,master3:345");
+        info = (RPCConnectionInfo) ConnectionInfo.create("jdbc:phoenix+rpc", config, null, null);
+        // TODO looks like HBase doesn't do port replacement/check for RPC servers either ?
+        assertEquals(info.getBoostrapServers(), "master1:123,master2:234,master3:345");
+    }
+
     @Test
     public void testNotAccept() throws Exception {
         Driver driver = new PhoenixDriver();
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
index 8146368304..368a9c52a4 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.ConnectionlessQueryServicesImpl;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesTestImpl;
+import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 
 
@@ -88,16 +89,15 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver {
     }
     
     @Override // public for testing
-    public synchronized ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException {
+    public synchronized ConnectionQueryServices getConnectionQueryServices(String url, Properties infoIn) throws SQLException {
         checkClosed();
-        ConnectionInfo connInfo = ConnectionInfo.create(url);
+        final Properties info = PropertiesUtil.deepCopy(infoIn);
+        ConnectionInfo connInfo = ConnectionInfo.create(url, null, info);
         ConnectionQueryServices connectionQueryServices = connectionQueryServicesMap.get(connInfo);
         if (connectionQueryServices != null) {
             return connectionQueryServices;
         }
-        if (info == null) {
-            info = new Properties();
-        }
+        info.putAll(connInfo.asProps().asMap());
         if (connInfo.isConnectionless()) {
             connectionQueryServices = new ConnectionlessQueryServicesImpl(queryServices, connInfo, info);
         } else {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java
index 7dab05f9e2..b3677b27d2 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java
@@ -18,7 +18,9 @@
 package org.apache.phoenix.mapreduce;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.mapred.JobContext;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -40,6 +42,7 @@ public class PhoenixMultiViewInputFormatTest {
 
         Configuration config = new Configuration();
         config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, "10");
+        config.set(HConstants.ZOOKEEPER_QUORUM, PhoenixRuntime.CONNECTIONLESS);
         JobContext mockContext = Mockito.mock(JobContext.class);
         when(mockContext.getConfiguration()).thenReturn(config);
 
@@ -55,6 +58,7 @@ public class PhoenixMultiViewInputFormatTest {
         Configuration config = new Configuration();
         config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, "10");
         config.set(MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ, "dummy.path");
+        config.set(HConstants.ZOOKEEPER_QUORUM, PhoenixRuntime.CONNECTIONLESS);
         JobContext mockContext = Mockito.mock(JobContext.class);
         when(mockContext.getConfiguration()).thenReturn(config);
 
@@ -73,6 +77,7 @@ public class PhoenixMultiViewInputFormatTest {
         Configuration config = new Configuration();
         config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, "10");
         config.set(MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ, "dummy.path");
+        config.set(HConstants.ZOOKEEPER_QUORUM, PhoenixRuntime.CONNECTIONLESS);
         JobContext mockContext = Mockito.mock(JobContext.class);
         when(mockContext.getConfiguration()).thenReturn(config);
 
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
index c316fa45c0..b678514c64 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
@@ -17,6 +17,9 @@
  */
 package org.apache.phoenix.mapreduce.util;
 
+import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
 import static org.junit.Assert.assertEquals;
 
 import java.sql.Connection;
@@ -41,7 +44,12 @@ import org.junit.Test;
 public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
     private static final String ORIGINAL_CLUSTER_QUORUM = "myzookeeperhost";
     private static final String OVERRIDE_CLUSTER_QUORUM = "myoverridezookeeperhost";
-    
+
+    // This is a hack that relies on the way The URL is re-constructed from Configuration to
+    // generate a Test connection for the MR jobs
+    protected static String TEST_ZK_QUORUM =
+            CONNECTIONLESS + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM + JDBC_PROTOCOL_TERMINATOR;
+
     @Test
     /**
      * This test reproduces the bug filed in PHOENIX-2310. 
@@ -68,7 +76,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
                     "  AS SELECT * FROM " + tableName + "\n";
             conn.createStatement().execute(viewDdl);
             final Configuration configuration = new Configuration ();
-            configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+            configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM);
             PhoenixConfigurationUtil.setOutputTableName(configuration, viewName);
             PhoenixConfigurationUtil.setPhysicalTableName(configuration, viewName);
             PhoenixConfigurationUtil.setUpsertColumnNames(configuration, new String[] {"A_STRING", "A_BINARY", "COL1"});
@@ -97,7 +105,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
                     "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
             conn.createStatement().execute(ddl);
             final Configuration configuration = new Configuration ();
-            configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+            configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM);
             PhoenixConfigurationUtil.setOutputTableName(configuration, tableName);
             PhoenixConfigurationUtil.setPhysicalTableName(configuration, tableName);
             PhoenixConfigurationUtil.setUpsertColumnNames(configuration, new String[] {"A_STRING", "A_BINARY", "COL1"});
@@ -124,7 +132,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
                     "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
             conn.createStatement().execute(ddl);
             final Configuration configuration = new Configuration ();
-            configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+            configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM);
             PhoenixConfigurationUtil.setOutputTableName(configuration, tableName);
             PhoenixConfigurationUtil.setPhysicalTableName(configuration, tableName);
             final String upserStatement = PhoenixConfigurationUtil.getUpsertStatement(configuration);
@@ -145,7 +153,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
                     "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
             conn.createStatement().execute(ddl);
             final Configuration configuration = new Configuration ();
-            configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+            configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM);
             PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
             final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
             final String expectedSelectStatement = "SELECT \"A_STRING\" , \"A_BINARY\" , \"0\".\"COL1\" FROM " + tableName ;
@@ -167,7 +175,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
                     "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
             conn.createStatement().execute(ddl);
             final Configuration configuration = new Configuration ();
-            configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+            configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM);
             PhoenixConfigurationUtil.setInputTableName(configuration, fullTableName);
             final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
             final String expectedSelectStatement = "SELECT \"A_STRING\" , \"A_BINARY\" , \"0\".\"COL1\" FROM " + fullTableName;
@@ -187,7 +195,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
                     "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
             conn.createStatement().execute(ddl);
             final Configuration configuration = new Configuration ();
-            configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+            configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM);
             PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
             PhoenixConfigurationUtil.setSelectColumnNames(configuration, new String[]{"A_BINARY"});
             final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
@@ -207,7 +215,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
                     "  (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[])\n";
             conn.createStatement().execute(ddl);
             final Configuration configuration = new Configuration ();
-            configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+            configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM);
             PhoenixConfigurationUtil.setSelectColumnNames(configuration,new String[]{"ID","VCARRAY"});
             PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
             PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/transaction/OmidTransactionService.java b/phoenix-core/src/test/java/org/apache/phoenix/transaction/OmidTransactionService.java
index 64e721c04f..02531faf77 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/transaction/OmidTransactionService.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/transaction/OmidTransactionService.java
@@ -35,7 +35,7 @@ import org.apache.omid.tso.client.OmidClientConfiguration;
 import org.apache.omid.tso.client.TSOClient;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.jdbc.ConnectionInfo;
 
 import com.google.inject.Guice;
 import com.google.inject.Injector;
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/transaction/TransactionServiceManager.java b/phoenix-core/src/test/java/org/apache/phoenix/transaction/TransactionServiceManager.java
index f3c1bb9ce2..ebb892101e 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/transaction/TransactionServiceManager.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/transaction/TransactionServiceManager.java
@@ -20,7 +20,7 @@ package org.apache.phoenix.transaction;
 import java.sql.SQLException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.jdbc.ConnectionInfo;
 import org.apache.phoenix.transaction.TransactionFactory.Provider;
 
 public class TransactionServiceManager {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java
index 73eb637113..11add1ffcb 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java
@@ -144,7 +144,7 @@ public class JDBCUtilTest {
 
     @Test
     public void formatZookeeperUrlSameOrderTest() {
-        String zk1 = "zk1.net,zk2.net,zk3.net:2181:/hbase";
+        String zk1 = "zk1.net\\:2181,zk2.net\\:2181,zk3.net\\:2181::/hbase";
         String result = JDBCUtil.formatZookeeperUrl(zk1);
         assertEquals(zk1,result);
     }
@@ -154,21 +154,21 @@ public class JDBCUtilTest {
     public void formatZookeeperUrlDifferentOrderTest() {
         String zk1 = "zk3.net,zk2.net,zk1.net:2181:/hbase";
         String result = JDBCUtil.formatZookeeperUrl(zk1);
-        assertEquals("zk1.net,zk2.net,zk3.net:2181:/hbase",result);
+        assertEquals("zk1.net\\:2181,zk2.net\\:2181,zk3.net\\:2181::/hbase",result);
     }
 
     @Test
     public void formatZookeeperUrlNoTrailersTest() {
         String zk1 = "zk1.net,zk2.net,zk3.net";
         String result = JDBCUtil.formatZookeeperUrl(zk1);
-        assertEquals(zk1,result);
+        assertEquals("zk1.net\\:2181,zk2.net\\:2181,zk3.net\\:2181::/hbase",result);
     }
 
     @Test
     public void formatZookeeperUrlToLowercaseTest() {
         String zk1 = "MYHOST1.NET,MYHOST2.NET";
         String result = JDBCUtil.formatZookeeperUrl(zk1);
-        assertEquals(zk1.toLowerCase(),result);
+        assertEquals("myhost1.net\\:2181,myhost2.net\\:2181::/hbase",result);
     }
 
 }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
index 63dd88581f..0719dc5a61 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.sql.Types;
+import java.util.Arrays;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
@@ -141,18 +142,25 @@ public class QueryUtilTest {
 
     private void validateUrl(String url) {
         String prefix = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
-        assertTrue("JDBC URL missing jdbc protocol prefix", url.startsWith(prefix));
+        String zkPrefix = PhoenixRuntime.JDBC_PROTOCOL_ZK+ PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+        String masterPrefix = PhoenixRuntime.JDBC_PROTOCOL_MASTER + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+        String rpcPrefix = PhoenixRuntime.JDBC_PROTOCOL_RPC + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+
+        assertTrue("JDBC URL missing jdbc protocol prefix",
+            url.startsWith(prefix) || url.startsWith(zkPrefix) || url.startsWith(masterPrefix)
+                    || url.startsWith(rpcPrefix));
         assertTrue("JDBC URL missing jdbc terminator suffix", url.endsWith(";"));
+        url = url.replaceAll("\\\\:", "=");
         // remove the prefix, should only be left with server[,server...]:port:/znode
-        url = url.substring(prefix.length());
         String[] splits = url.split(":");
+        splits = Arrays.copyOfRange(splits, 2, splits.length);
         assertTrue("zk details should contain at least server component", splits.length >= 1);
         // make sure that each server is comma separated
-        String[] servers = splits[0].split(",");
+        String[] servers = splits[0].replaceAll("=", "\\\\:").split(",");
         for(String server: servers){
             assertFalse("Found whitespace in server names for url: " + url, server.contains(" "));
         }
-        if (splits.length >= 2) {
+        if (splits.length >= 2 && !splits[1].isEmpty()) {
             // second bit is a port number, should not through
             try {
                 Integer.parseInt(splits[1]);