You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2023/11/06 07:47:14 UTC
(phoenix) branch master updated: PHOENIX-6523 Support for HBase Registry Implementations through Phoenix connection URL
This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 09d2299706 PHOENIX-6523 Support for HBase Registry Implementations through Phoenix connection URL
09d2299706 is described below
commit 09d229970627eb9b0a0b2ad565f10ba4d3bfeb9c
Author: Istvan Toth <st...@apache.org>
AuthorDate: Thu Aug 3 11:17:35 2023 +0200
PHOENIX-6523 Support for HBase Registry Implementations through Phoenix connection URL
Co-authored-by: Ramie <ra...@salesforce.com>
---
.../org/apache/phoenix/end2end/ConnectionIT.java | 156 ++++++
.../end2end/ConnectionQueryServicesTestImpl.java | 2 +-
.../apache/phoenix/end2end/PhoenixTTLToolIT.java | 43 +-
.../SystemTablesCreationOnConnectionIT.java | 8 +-
.../phoenix/end2end/SystemTablesUpgradeIT.java | 4 +-
.../end2end/index/MutableIndexFailureIT.java | 7 +-
.../transform/TransformMonitorExtendedIT.java | 19 +
.../phoenix/jdbc/FailoverPhoenixConnectionIT.java | 18 +-
.../phoenix/jdbc/HighAvailabilityGroupIT.java | 2 +-
.../phoenix/jdbc/HighAvailabilityGroupTestIT.java | 8 +-
.../jdbc/HighAvailabilityTestingUtility.java | 10 +-
.../phoenix/jdbc/ParallelPhoenixConnectionIT.java | 2 +
.../phoenix/jdbc/SecureUserConnectionsIT.java | 41 +-
.../monitoring/PhoenixTableLevelMetricsIT.java | 10 +-
.../replication/SystemCatalogWALEntryFilterIT.java | 14 +-
.../phoenix/jdbc/AbstractRPCConnectionInfo.java | 263 ++++++++++
.../org/apache/phoenix/jdbc/ClusterRoleRecord.java | 1 +
.../org/apache/phoenix/jdbc/ConnectionInfo.java | 545 ++++++++++++++++++++
.../apache/phoenix/jdbc/HighAvailabilityGroup.java | 21 +-
.../apache/phoenix/jdbc/MasterConnectionInfo.java | 103 ++++
.../phoenix/jdbc/ParallelPhoenixConnection.java | 8 +-
.../phoenix/jdbc/ParallelPhoenixContext.java | 1 +
.../org/apache/phoenix/jdbc/PhoenixDriver.java | 25 +-
.../apache/phoenix/jdbc/PhoenixEmbeddedDriver.java | 568 ++-------------------
.../apache/phoenix/jdbc/PhoenixHAAdminTool.java | 3 +-
.../org/apache/phoenix/jdbc/RPCConnectionInfo.java | 188 +++++++
.../org/apache/phoenix/jdbc/ZKConnectionInfo.java | 341 +++++++++++++
.../phoenix/mapreduce/AbstractBulkLoadTool.java | 6 +-
.../phoenix/mapreduce/util/ConnectionUtil.java | 77 ++-
.../mapreduce/util/PhoenixConfigurationUtil.java | 48 +-
.../phoenix/query/ConnectionQueryServicesImpl.java | 2 +-
.../query/ConnectionlessQueryServicesImpl.java | 2 +-
.../org/apache/phoenix/query/QueryServices.java | 3 +
.../java/org/apache/phoenix/trace/TraceReader.java | 1 -
.../apache/phoenix/trace/TraceSpanReceiver.java | 2 -
.../NotAvailableTransactionProvider.java | 2 +-
.../transaction/OmidTransactionProvider.java | 2 +-
.../transaction/PhoenixTransactionProvider.java | 2 +-
.../java/org/apache/phoenix/util/JDBCUtil.java | 49 +-
.../org/apache/phoenix/util/PhoenixRuntime.java | 98 ++--
.../java/org/apache/phoenix/util/QueryUtil.java | 46 +-
.../apache/phoenix/jdbc/ClusterRoleRecordTest.java | 4 +-
.../jdbc/ParallelPhoenixConnectionFailureTest.java | 8 +-
.../phoenix/jdbc/PhoenixEmbeddedDriverTest.java | 534 ++++++++++++++++---
.../org/apache/phoenix/jdbc/PhoenixTestDriver.java | 10 +-
.../mapreduce/PhoenixMultiViewInputFormatTest.java | 5 +
.../util/PhoenixConfigurationUtilTest.java | 24 +-
.../transaction/OmidTransactionService.java | 2 +-
.../transaction/TransactionServiceManager.java | 2 +-
.../java/org/apache/phoenix/util/JDBCUtilTest.java | 8 +-
.../org/apache/phoenix/util/QueryUtilTest.java | 16 +-
51 files changed, 2517 insertions(+), 847 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionIT.java
new file mode 100644
index 0000000000..243b3c730a
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionIT.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.util.InstanceResolver;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ConnectionIT {
+
+ private static HBaseTestingUtility hbaseTestUtil;
+ private static Configuration conf;
+
+ private static int tableCounter;
+
+ @BeforeClass
+ public static synchronized void setUp() throws Exception {
+ hbaseTestUtil = new HBaseTestingUtility();
+ conf = hbaseTestUtil.getConfiguration();
+ setUpConfigForMiniCluster(conf);
+ conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase-test");
+ hbaseTestUtil.startMiniCluster();
+ Class.forName(PhoenixDriver.class.getName());
+ InstanceResolver.clearSingletons();
+ // Make sure the ConnectionInfo doesn't try to pull a default Configuration
+ InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() {
+ @Override
+ public Configuration getConfiguration() {
+ return new Configuration(conf);
+ }
+
+ @Override
+ public Configuration getConfiguration(Configuration confToClone) {
+ Configuration copy = new Configuration(conf);
+ copy.addResource(confToClone);
+ return copy;
+ }
+ });
+ }
+
+ @AfterClass
+ public static synchronized void cleanUp() throws Exception {
+ InstanceResolver.clearSingletons();
+ }
+
+ @Test
+ public void testInputAndOutputConnections() throws SQLException {
+ try (Connection inputConnection = ConnectionUtil.getInputConnection(conf)) {
+ smoke(inputConnection);
+ }
+ try (Connection outputConnection = ConnectionUtil.getOutputConnection(conf)) {
+ smoke(outputConnection);
+ }
+ }
+
+ private void smoke(Connection conn) throws SQLException {
+ String table = "t" + tableCounter++;
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute("create table " + table + " (a integer primary key,b varchar)");
+ stmt.execute("upsert into " + table + " values(1,'foo')");
+ conn.commit();
+ ResultSet rs = stmt.executeQuery("select count(*) from " + table);
+ rs.next();
+ assertEquals(1, rs.getInt(1));
+ }
+ }
+
+ @Test
+ public void testZkConnections() throws SQLException {
+ String zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
+ String zkPort = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
+ try (PhoenixConnection conn1 =
+ (PhoenixConnection) DriverManager.getConnection("jdbc:phoenix");
+ PhoenixConnection conn2 =
+ (PhoenixConnection) DriverManager.getConnection("jdbc:phoenix+zk");
+ PhoenixConnection conn3 =
+ (PhoenixConnection) DriverManager
+ .getConnection("jdbc:phoenix+zk:" + zkQuorum + ":" + zkPort);) {
+ smoke(conn1);
+ smoke(conn2);
+ smoke(conn3);
+ assertEquals(conn1.getQueryServices(), conn2.getQueryServices());
+ assertEquals(conn1.getQueryServices(), conn3.getQueryServices());
+ }
+ }
+
+ @Test
+ public void testMasterConnections() throws SQLException {
+ assumeTrue(VersionInfo.compareVersion(VersionInfo.getVersion(), "2.3.0") >= 0);
+ int masterPortString = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
+ String masterHosts = conf.get(HConstants.MASTER_ADDRS_KEY);
+ try (PhoenixConnection conn1 =
+ (PhoenixConnection) DriverManager.getConnection("jdbc:phoenix+master");
+ PhoenixConnection conn2 =
+ (PhoenixConnection) DriverManager.getConnection("jdbc:phoenix+master:"
+ + masterHosts.replaceAll(":", "\\\\:") + ":" + masterPortString);) {
+ smoke(conn1);
+ smoke(conn2);
+ assertEquals(conn1.getQueryServices(), conn2.getQueryServices());
+ }
+ }
+
+ @Test
+ public void testRPCConnections() throws SQLException {
+ assumeTrue(VersionInfo.compareVersion(VersionInfo.getVersion(), "2.5.0") >= 0);
+ String masterHosts = conf.get(HConstants.MASTER_ADDRS_KEY);
+
+ try (PhoenixConnection conn1 =
+ (PhoenixConnection) DriverManager.getConnection("jdbc:phoenix+rpc");
+ PhoenixConnection conn2 =
+ (PhoenixConnection) DriverManager.getConnection(
+ "jdbc:phoenix+rpc:" + masterHosts.replaceAll(":", "\\\\:"));) {
+ smoke(conn1);
+ smoke(conn2);
+ assertEquals(conn1.getQueryServices(), conn2.getQueryServices());
+ }
+
+ }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
index e342f50354..f8b46905d2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
@@ -29,8 +29,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import com.google.protobuf.RpcController;
+import org.apache.phoenix.jdbc.ConnectionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.query.ConnectionQueryServicesImpl;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.transaction.PhoenixTransactionClient;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixTTLToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixTTLToolIT.java
index 7b16c145cc..58282ef6ce 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixTTLToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixTTLToolIT.java
@@ -17,21 +17,31 @@
*/
package org.apache.phoenix.end2end;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.mapreduce.PhoenixTTLTool;
import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.ConfigurationFactory;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.InstanceResolver;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
@@ -39,15 +49,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
@Category(NeedsOwnMiniClusterTest.class)
public class PhoenixTTLToolIT extends ParallelStatsDisabledIT {
@@ -58,6 +59,22 @@ public class PhoenixTTLToolIT extends ParallelStatsDisabledIT {
props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(false));
props.put(QueryServices.PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED, Boolean.toString(true));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+
+ InstanceResolver.clearSingletons();
+ // Make sure the ConnectionInfo in the tool doesn't try to pull a default Configuration
+ InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() {
+ @Override
+ public Configuration getConfiguration() {
+ return new Configuration(config);
+ }
+
+ @Override
+ public Configuration getConfiguration(Configuration confToClone) {
+ Configuration copy = new Configuration(config);
+ copy.addResource(confToClone);
+ return copy;
+ }
+ });
}
private final long PHOENIX_TTL_EXPIRE_IN_A_SECOND = 1;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
index a1f1892982..4c002cfaaa 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
@@ -55,9 +55,9 @@ import org.apache.phoenix.compat.hbase.CompatUtil;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.UpgradeRequiredException;
+import org.apache.phoenix.jdbc.ConnectionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDriver;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
import org.apache.phoenix.jdbc.PhoenixTestDriver;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.ConnectionQueryServices;
@@ -112,7 +112,7 @@ public class SystemTablesCreationOnConnectionIT {
private static class PhoenixSysCatCreationServices extends ConnectionQueryServicesImpl {
PhoenixSysCatCreationServices(QueryServices services,
- PhoenixEmbeddedDriver.ConnectionInfo connectionInfo, Properties info) {
+ ConnectionInfo connectionInfo, Properties info) {
super(services, connectionInfo, info);
}
@@ -156,9 +156,9 @@ public class SystemTablesCreationOnConnectionIT {
@Override // public for testing
public synchronized ConnectionQueryServices getConnectionQueryServices(String url,
Properties info) throws SQLException {
+ QueryServicesTestImpl qsti = new QueryServicesTestImpl(getDefaultProps(), overrideProps);
if (cqs == null) {
- cqs = new PhoenixSysCatCreationServices(new QueryServicesTestImpl(getDefaultProps(),
- overrideProps), ConnectionInfo.create(url), info);
+ cqs = new PhoenixSysCatCreationServices(qsti, ConnectionInfo.create(url, qsti.getProps(), info), info);
cqs.init(url, info);
}
return cqs;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesUpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesUpgradeIT.java
index 9c4d03b536..647e3aae14 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesUpgradeIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesUpgradeIT.java
@@ -30,9 +30,9 @@ import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.coprocessor.TaskMetaDataEndpoint;
+import org.apache.phoenix.jdbc.ConnectionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.jdbc.PhoenixTestDriver;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.ConnectionQueryServices;
@@ -93,7 +93,7 @@ public class SystemTablesUpgradeIT extends BaseTest {
@Override // public for testing
public synchronized ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException {
if (cqs == null) {
- cqs = new PhoenixUpgradeCountingServices(new QueryServicesTestImpl(getDefaultProps(), overrideProps), ConnectionInfo.create(url), info);
+ cqs = new PhoenixUpgradeCountingServices(new QueryServicesTestImpl(getDefaultProps(), overrideProps), ConnectionInfo.create(url, null, null), info);
cqs.init(url, info);
} else if (reinitialize) {
cqs.init(url, info);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 4b9cc9f9a8..624c4ab0bc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -86,7 +86,8 @@ import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
/**
@@ -100,6 +101,9 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
@Category(NeedsOwnMiniClusterTest.class)
@RunWith(Parameterized.class)
public class MutableIndexFailureIT extends BaseTest {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MutableIndexFailureIT.class);
+
public static volatile boolean FAIL_WRITE = false;
public static volatile String fullTableName;
@@ -473,6 +477,7 @@ public class MutableIndexFailureIT extends BaseTest {
}
conn.commit();
} catch (SQLException e) {
+ LOGGER.warn("Error while adding row", e);
return false;
}
return true;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java
index 4ccc3d4cae..252e243357 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end.transform;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -28,11 +29,13 @@ import org.apache.phoenix.end2end.index.SingleCellIndexIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConfigurationFactory;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.transform.SystemTransformRecord;
import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.util.InstanceResolver;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
@@ -77,6 +80,22 @@ public class TransformMonitorExtendedIT extends BaseTest {
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
new ReadOnlyProps(clientProps.entrySet().iterator()));
+
+ InstanceResolver.clearSingletons();
+ // Make sure the ConnectionInfo in the tool doesn't try to pull a default Configuration
+ InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() {
+ @Override
+ public Configuration getConfiguration() {
+ return new Configuration(config);
+ }
+
+ @Override
+ public Configuration getConfiguration(Configuration confToClone) {
+ Configuration copy = new Configuration(config);
+ copy.addResource(confToClone);
+ return copy;
+ }
+ });
}
public TransformMonitorExtendedIT() throws IOException, InterruptedException {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java
index 774b8947d8..18b6a14be6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java
@@ -204,7 +204,7 @@ public class FailoverPhoenixConnectionIT {
try (Connection conn = createFailoverConnection()) {
FailoverPhoenixConnection failoverConn = (FailoverPhoenixConnection) conn;
- assertEquals(CLUSTERS.getUrl2(), failoverConn.getWrappedConnection().getURL());
+ assertEquals(CLUSTERS.getJdbcUrl2(), failoverConn.getWrappedConnection().getURL());
doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
}
});
@@ -237,7 +237,7 @@ public class FailoverPhoenixConnectionIT {
try (Connection conn = createFailoverConnection()) {
FailoverPhoenixConnection failoverConn = (FailoverPhoenixConnection) conn;
- assertEquals(CLUSTERS.getUrl2(), failoverConn.getWrappedConnection().getURL());
+ assertEquals(CLUSTERS.getJdbcUrl2(), failoverConn.getWrappedConnection().getURL());
doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
}
});
@@ -286,7 +286,7 @@ public class FailoverPhoenixConnectionIT {
LOG.info("Testing failover connection when both clusters are up and running");
try (Connection conn = createFailoverConnection()) {
FailoverPhoenixConnection failoverConn = conn.unwrap(FailoverPhoenixConnection.class);
- assertEquals(CLUSTERS.getUrl2(), failoverConn.getWrappedConnection().getURL());
+ assertEquals(CLUSTERS.getJdbcUrl2(), failoverConn.getWrappedConnection().getURL());
doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
}
@@ -398,7 +398,7 @@ public class FailoverPhoenixConnectionIT {
// The wrapped connection is still against the first cluster, and is closed
PhoenixConnection pc = ((FailoverPhoenixConnection)conn).getWrappedConnection();
assertNotNull(pc);
- assertEquals(CLUSTERS.getUrl1(), pc.getURL());
+ assertEquals(CLUSTERS.getJdbcUrl1(), pc.getURL());
assertTrue(pc.isClosed());
doTestActionShouldFailBecauseOfFailover(conn::createStatement);
}
@@ -485,7 +485,7 @@ public class FailoverPhoenixConnectionIT {
public void testFailoverCanFinishWhenOneConnectionGotStuckClosing() throws Exception {
Connection conn = createFailoverConnection();
doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
- assertEquals(CLUSTERS.getUrl1(), // active connection is against the first cluster
+ assertEquals(CLUSTERS.getJdbcUrl1(), // active connection is against the first cluster
conn.unwrap(FailoverPhoenixConnection.class).getWrappedConnection().getURL());
// Spy the wrapped connection
@@ -518,7 +518,7 @@ public class FailoverPhoenixConnectionIT {
try (Connection conn2 = createFailoverConnection()) {
doTestBasicOperationsWithConnection(conn2, tableName, haGroupName);
- assertEquals(CLUSTERS.getUrl2(), // active connection is against the second cluster
+ assertEquals(CLUSTERS.getJdbcUrl2(), // active connection is against the second cluster
conn2.unwrap(FailoverPhoenixConnection.class).getWrappedConnection().getURL());
}
@@ -682,7 +682,7 @@ public class FailoverPhoenixConnectionIT {
public void testFailoverTwice() throws Exception {
try (Connection conn = createFailoverConnection()) {
doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
- assertEquals(CLUSTERS.getUrl1(), // active connection is against the first cluster
+ assertEquals(CLUSTERS.getJdbcUrl1(), // active connection is against the first cluster
conn.unwrap(FailoverPhoenixConnection.class).getWrappedConnection().getURL());
}
@@ -691,7 +691,7 @@ public class FailoverPhoenixConnectionIT {
try (Connection conn = createFailoverConnection()) {
doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
- assertEquals(CLUSTERS.getUrl2(), // active connection is against the second cluster
+ assertEquals(CLUSTERS.getJdbcUrl2(), // active connection is against the second cluster
conn.unwrap(FailoverPhoenixConnection.class).getWrappedConnection().getURL());
}
@@ -700,7 +700,7 @@ public class FailoverPhoenixConnectionIT {
try (Connection conn = createFailoverConnection()) {
doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
- assertEquals(CLUSTERS.getUrl1(), // active connection is against the first cluster
+ assertEquals(CLUSTERS.getJdbcUrl1(), // active connection is against the first cluster
conn.unwrap(FailoverPhoenixConnection.class).getWrappedConnection().getURL());
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java
index 4435236625..f7a99ba92a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java
@@ -327,7 +327,7 @@ public class HighAvailabilityGroupIT {
*/
@Test
public void testConnectToOneCluster() throws SQLException {
- final String url = CLUSTERS.getUrl1();
+ final String url = CLUSTERS.getJdbcUrl1();
PhoenixConnection connection = haGroup.connectToOneCluster(url, clientProperties);
assertEquals(url, connection.getURL());
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTestIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTestIT.java
index 16f9e9af6a..cc9b703005 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTestIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTestIT.java
@@ -80,8 +80,8 @@ public class HighAvailabilityGroupTestIT {
// This test cannot be run in parallel since it registers/deregisters driver
private static final Logger LOG = LoggerFactory.getLogger(HighAvailabilityGroupTestIT.class);
- private static final String ZK1 = "zk1-1,zk1-2:2181:/hbase";
- private static final String ZK2 = "zk2-1,zk2-2:2181:/hbase";
+ private static final String ZK1 = "zk1-1\\:2181,zk1-2\\:2181::/hbase";
+ private static final String ZK2 = "zk2-1\\:2181,zk2-2\\:2181::/hbase";
private static final PhoenixEmbeddedDriver DRIVER = mock(PhoenixEmbeddedDriver.class);
/** The client properties to create a JDBC connection. */
@@ -232,9 +232,9 @@ public class HighAvailabilityGroupTestIT {
public void testConnectToOneClusterShouldNotFailWithDifferentHostOrderJdbcString() throws SQLException {
// test with JDBC string
final String hosts = "zk1-2,zk1-1:2181:/hbase";
- final String jdbcString = String.format("jdbc:phoenix:%s", hosts);
+ final String jdbcString = String.format("jdbc:phoenix+zk:%s", hosts);
haGroup.connectToOneCluster(jdbcString, clientProperties);
- verify(DRIVER, times(1)).getConnectionQueryServices(eq(String.format("jdbc:phoenix:%s",ZK1)), eq(clientProperties));
+ verify(DRIVER, times(1)).getConnectionQueryServices(eq(String.format("jdbc:phoenix+zk:%s",ZK1)), eq(clientProperties));
}
/**
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
index d604774e50..1cb6c0ba79 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
@@ -78,9 +78,9 @@ public class HighAvailabilityTestingUtility {
public static class HBaseTestingUtilityPair implements Closeable {
private final HBaseTestingUtility hbaseCluster1 = new HBaseTestingUtility();
private final HBaseTestingUtility hbaseCluster2 = new HBaseTestingUtility();
- /** The host:port:/hbase format of the JDBC string for HBase cluster 1. */
+ /** The host\:port::/hbase format of the JDBC string for HBase cluster 1. */
private String url1;
- /** The host:port:/hbase format of the JDBC string for HBase cluster 2. */
+ /** The host\:port::/hbase format of the JDBC string for HBase cluster 2. */
private String url2;
private PhoenixHAAdminHelper haAdmin1;
private PhoenixHAAdminHelper haAdmin2;
@@ -114,8 +114,8 @@ public class HighAvailabilityTestingUtility {
String confAddress1 = hbaseCluster1.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM);
String confAddress2 = hbaseCluster2.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM);
- url1 = String.format("%s:%d:/hbase", confAddress1, hbaseCluster1.getZkCluster().getClientPort());
- url2 = String.format("%s:%d:/hbase", confAddress2, hbaseCluster2.getZkCluster().getClientPort());
+ url1 = String.format("%s\\:%d::/hbase", confAddress1, hbaseCluster1.getZkCluster().getClientPort());
+ url2 = String.format("%s\\:%d::/hbase", confAddress2, hbaseCluster2.getZkCluster().getClientPort());
haAdmin1 = new PhoenixHAAdminHelper(getUrl1(), hbaseCluster1.getConfiguration(), PhoenixHAAdminTool.HighAvailibilityCuratorProvider.INSTANCE);
haAdmin2 = new PhoenixHAAdminHelper(getUrl2(), hbaseCluster2.getConfiguration(), PhoenixHAAdminTool.HighAvailibilityCuratorProvider.INSTANCE);
@@ -354,7 +354,7 @@ public class HighAvailabilityTestingUtility {
}
public String getJdbcUrl(String zkUrl) {
- return String.format("jdbc:phoenix:%s:%s", zkUrl, PRINCIPAL);
+ return String.format("jdbc:phoenix+zk:%s:%s", zkUrl, PRINCIPAL);
}
public String getUrl1() {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
index 14127f111e..7ecacd2261 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
@@ -151,12 +151,14 @@ public class ParallelPhoenixConnectionIT {
Assert.assertEquals(HBaseTestingUtilityPair.PRINCIPAL, cqsi.getUserName());
PhoenixConnection pConn = pr.getFutureConnection1().get();
ConnectionQueryServices cqsiFromConn = pConn.getQueryServices();
+ Assert.assertEquals(HBaseTestingUtilityPair.PRINCIPAL, cqsiFromConn.getUserName());
Assert.assertTrue(cqsi == cqsiFromConn);
// verify connection#2
cqsi = PhoenixDriver.INSTANCE.getConnectionQueryServices(group.getJDBCUrl2(), clientProperties);
Assert.assertEquals(HBaseTestingUtilityPair.PRINCIPAL, cqsi.getUserName());
pConn = pr.getFutureConnection2().get();
cqsiFromConn = pConn.getQueryServices();
+ Assert.assertEquals(HBaseTestingUtilityPair.PRINCIPAL, cqsiFromConn.getUserName());
Assert.assertTrue(cqsi == cqsiFromConn);
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/SecureUserConnectionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/SecureUserConnectionsIT.java
index 630590f50c..47d7c0424b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/SecureUserConnectionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/SecureUserConnectionsIT.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.query.ConfigurationFactory;
import org.apache.phoenix.util.InstanceResolver;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -235,7 +234,7 @@ public class SecureUserConnectionsIT {
PrivilegedExceptionAction<Void> callable = new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
String url = joinUserAuthentication(BASE_URL, princ1, keytab1);
- connections.add(ConnectionInfo.create(url).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+ connections.add(ConnectionInfo.create(url,ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
return null;
}
};
@@ -261,7 +260,7 @@ public class SecureUserConnectionsIT {
PrivilegedExceptionAction<Void> callable = new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
String url = joinUserAuthentication(BASE_URL, princ1, keytab1);
- connections.add(ConnectionInfo.create(url).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+ connections.add(ConnectionInfo.create(url, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
return null;
}
};
@@ -293,7 +292,7 @@ public class SecureUserConnectionsIT {
ugi1.doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
String url = joinUserAuthentication(BASE_URL, princ1, keytab1);
- connections.add(ConnectionInfo.create(url).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+ connections.add(ConnectionInfo.create(url, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
return null;
}
});
@@ -304,7 +303,7 @@ public class SecureUserConnectionsIT {
ugi2.doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
String url = joinUserAuthentication(BASE_URL, princ2, keytab2);
- connections.add(ConnectionInfo.create(url).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+ connections.add(ConnectionInfo.create(url, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
return null;
}
});
@@ -314,7 +313,7 @@ public class SecureUserConnectionsIT {
ugi1.doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
String url = joinUserAuthentication(BASE_URL, princ1, keytab1);
- connections.add(ConnectionInfo.create(url).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+ connections.add(ConnectionInfo.create(url, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
return null;
}
});
@@ -334,19 +333,19 @@ public class SecureUserConnectionsIT {
UserGroupInformation.loginUserFromKeytab(princ1, keytab1.getPath());
// Using the same UGI should result in two equivalent ConnectionInfo objects
- connections.add(ConnectionInfo.create(url1).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+ connections.add(ConnectionInfo.create(url1, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
assertEquals(1, connections.size());
// Sanity check
verifyAllConnectionsAreKerberosBased(connections);
UserGroupInformation.loginUserFromKeytab(princ2, keytab2.getPath());
- connections.add(ConnectionInfo.create(url2).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+ connections.add(ConnectionInfo.create(url2, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
assertEquals(2, connections.size());
verifyAllConnectionsAreKerberosBased(connections);
// Because the UGI instances are unique, so are the connections
UserGroupInformation.loginUserFromKeytab(princ1, keytab1.getPath());
- connections.add(ConnectionInfo.create(url1).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+ connections.add(ConnectionInfo.create(url1, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
assertEquals(3, connections.size());
verifyAllConnectionsAreKerberosBased(connections);
}
@@ -360,13 +359,13 @@ public class SecureUserConnectionsIT {
UserGroupInformation.loginUserFromKeytab(princ1, keytab1.getPath());
// Using the same UGI should result in two equivalent ConnectionInfo objects
- connections.add(ConnectionInfo.create(url).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+ connections.add(ConnectionInfo.create(url, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
assertEquals(1, connections.size());
// Sanity check
verifyAllConnectionsAreKerberosBased(connections);
// Because the UGI instances are unique, so are the connections
- connections.add(ConnectionInfo.create(url).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+ connections.add(ConnectionInfo.create(url, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
assertEquals(1, connections.size());
}
@@ -378,13 +377,13 @@ public class SecureUserConnectionsIT {
// Using the same UGI should result in two equivalent ConnectionInfo objects
final String url = joinUserAuthentication(BASE_URL, princ1, keytab1);
- connections.add(ConnectionInfo.create(url).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+ connections.add(ConnectionInfo.create(url, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
assertEquals(1, connections.size());
// Sanity check
verifyAllConnectionsAreKerberosBased(connections);
// Because the UGI instances are unique, so are the connections
- connections.add(ConnectionInfo.create(url).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+ connections.add(ConnectionInfo.create(url, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
assertEquals(1, connections.size());
}
@@ -399,18 +398,18 @@ public class SecureUserConnectionsIT {
final String url2 = joinUserAuthentication(BASE_URL, princ2, keytab2);
// Using the same UGI should result in two equivalent ConnectionInfo objects
- connections.add(ConnectionInfo.create(url1).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+ connections.add(ConnectionInfo.create(url1, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
assertEquals(1, connections.size());
// Sanity check
verifyAllConnectionsAreKerberosBased(connections);
// Because the UGI instances are unique, so are the connections
- connections.add(ConnectionInfo.create(url2).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+ connections.add(ConnectionInfo.create(url2, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
assertEquals(2, connections.size());
verifyAllConnectionsAreKerberosBased(connections);
// Using the same UGI should result in two equivalent ConnectionInfo objects
- connections.add(ConnectionInfo.create(url1).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+ connections.add(ConnectionInfo.create(url1, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
assertEquals(3, connections.size());
// Sanity check
verifyAllConnectionsAreKerberosBased(connections);
@@ -427,29 +426,29 @@ public class SecureUserConnectionsIT {
final String url2 = joinUserAuthentication(BASE_URL, princ2, keytab2);
// Using the same UGI should result in two equivalent ConnectionInfo objects
- connections.add(ConnectionInfo.create(url1).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+ connections.add(ConnectionInfo.create(url1, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
assertEquals(1, connections.size());
// Sanity check
verifyAllConnectionsAreKerberosBased(connections);
// Logging in as the same user again should not duplicate connections
- connections.add(ConnectionInfo.create(url1).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+ connections.add(ConnectionInfo.create(url1, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
assertEquals(1, connections.size());
// Sanity check
verifyAllConnectionsAreKerberosBased(connections);
// Add a second one.
- connections.add(ConnectionInfo.create(url2).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+ connections.add(ConnectionInfo.create(url2, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
assertEquals(2, connections.size());
verifyAllConnectionsAreKerberosBased(connections);
// Again, verify this user is not duplicated
- connections.add(ConnectionInfo.create(url2).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+ connections.add(ConnectionInfo.create(url2, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
assertEquals(2, connections.size());
verifyAllConnectionsAreKerberosBased(connections);
// Because the UGI instances are unique, so are the connections
- connections.add(ConnectionInfo.create(url1).normalize(ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
+ connections.add(ConnectionInfo.create(url1, ReadOnlyProps.EMPTY_PROPS, EMPTY_PROPERTIES));
assertEquals(3, connections.size());
verifyAllConnectionsAreKerberosBased(connections);
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
index f936796d34..441edcee7d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
@@ -29,8 +29,8 @@ import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.exception.PhoenixIOException;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.CommitException;
+import org.apache.phoenix.jdbc.ConnectionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
import org.apache.phoenix.jdbc.PhoenixTestDriver;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.ConfigurationFactory;
@@ -1478,7 +1478,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
private static class PhoenixMetricsTestingQueryServices extends ConnectionQueryServicesImpl {
PhoenixMetricsTestingQueryServices(QueryServices services,
- PhoenixEmbeddedDriver.ConnectionInfo connectionInfo, Properties info) {
+ ConnectionInfo connectionInfo, Properties info) {
super(services, connectionInfo, info);
}
@@ -1516,10 +1516,12 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
@Override public synchronized ConnectionQueryServices getConnectionQueryServices(String url,
Properties info) throws SQLException {
if (cqs == null) {
+ QueryServicesTestImpl qsti =
+ new QueryServicesTestImpl(getDefaultProps(), overrideProps);
cqs =
new PhoenixMetricsTestingQueryServices(
- new QueryServicesTestImpl(getDefaultProps(), overrideProps),
- ConnectionInfo.create(url), info);
+ qsti,
+ ConnectionInfo.create(url, qsti.getProps(), info), info);
cqs.init(url, info);
}
return cqs;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilterIT.java
index 943e21cf4b..cbeaedf682 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilterIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilterIT.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.replication;
import java.io.IOException;
+import java.sql.DriverManager;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
@@ -392,17 +393,14 @@ public class SystemCatalogWALEntryFilterIT extends ParallelStatsDisabledIT {
private static void dropTenantView() throws Exception {
Properties tenantProperties = new Properties();
tenantProperties.setProperty("TenantId", TENANT_ID);
- try (java.sql.Connection connection =
- ConnectionUtil.getInputConnection(getUtility().getConfiguration(), tenantProperties)) {
+ try (java.sql.Connection connection = DriverManager.getConnection(getUrl(), tenantProperties)) {
connection.createStatement().execute(DROP_TENANT_VIEW_SQL);
connection.commit();
}
}
private static void dropNonTenantView() throws Exception {
- try (java.sql.Connection connection =
- ConnectionUtil.getInputConnection(getUtility().getConfiguration(), new Properties())) {
-
+ try (java.sql.Connection connection = DriverManager.getConnection(getUrl())) {
connection.createStatement().execute(DROP_NONTENANT_VIEW_SQL);
}
}
@@ -410,16 +408,14 @@ public class SystemCatalogWALEntryFilterIT extends ParallelStatsDisabledIT {
private static void createTenantView() throws Exception {
Properties tenantProperties = new Properties();
tenantProperties.setProperty("TenantId", TENANT_ID);
- try (java.sql.Connection connection =
- ConnectionUtil.getInputConnection(getUtility().getConfiguration(), tenantProperties)) {
+ try (java.sql.Connection connection = DriverManager.getConnection(getUrl(), tenantProperties)) {
connection.createStatement().execute(CREATE_TENANT_VIEW_SQL);
connection.commit();
}
}
private static void createNonTenantView() throws Exception {
- try (java.sql.Connection connection =
- ConnectionUtil.getInputConnection(getUtility().getConfiguration(), new Properties())) {
+ try (java.sql.Connection connection = DriverManager.getConnection(getUrl())) {
connection.createStatement().execute(CREATE_NONTENANT_VIEW_SQL);
connection.commit();
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/AbstractRPCConnectionInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/AbstractRPCConnectionInfo.java
new file mode 100644
index 0000000000..869f40d2d5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/AbstractRPCConnectionInfo.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hbase.thirdparty.com.google.common.base.Strings;
+import org.apache.phoenix.util.ReadOnlyProps;
+
+/**
+ * Encapsulates the common logic for HRPC based ConnectionInfo classes.
+ *
+ */
+public abstract class AbstractRPCConnectionInfo extends ConnectionInfo {
+
+ private static final String MASTER_ADDRS_KEY = "hbase.masters";
+ private static final String MASTER_HOSTNAME_KEY = "hbase.master.hostname";
+
+ protected String bootstrapServers;
+
+ public String getBoostrapServers() {
+ return bootstrapServers;
+ }
+
+ protected AbstractRPCConnectionInfo(boolean isConnectionless, String principal, String keytab,
+ User user, String haGroup) {
+ super(isConnectionless, principal, keytab, user, haGroup);
+ }
+
+ @Override
+ public String getZookeeperConnectionString() {
+ throw new UnsupportedOperationException("MasterRegistry is used");
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((bootstrapServers == null) ? 0 : bootstrapServers.hashCode());
+ // Port is already provided in or normalized into bootstrapServers
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!super.equals(obj)) {
+ return false;
+ }
+ AbstractRPCConnectionInfo other = (AbstractRPCConnectionInfo) obj;
+ if (bootstrapServers == null) {
+ if (other.bootstrapServers != null) {
+ return false;
+ }
+ } else if (!bootstrapServers.equals(other.bootstrapServers)) {
+ return false;
+ }
+ // Port is already provided in or normalized into bootstrapServers
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(bootstrapServers.replaceAll(":", "\\\\:"));
+ if (anyNotNull(principal, keytab)) {
+ sb.append(principal == null ? ":::" : ":::" + principal);
+ }
+ if (anyNotNull(keytab)) {
+ sb.append(keytab == null ? ":" : ":" + keytab);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Abstract Builder parent for HRPC based ConnectionInfo classes.
+ *
+ * @since 138
+ */
+ protected abstract static class Builder extends ConnectionInfo.Builder {
+ String hostsList;
+ String portString;
+ Integer port;
+
+ public Builder(String url, Configuration config, ReadOnlyProps props, Properties info) {
+ super(url, config, props, info);
+ }
+
+ @Override
+ protected ConnectionInfo create() throws SQLException {
+ parse();
+ normalize();
+ handleKerberosAndLogin();
+ setHaGroup();
+ return build();
+ }
+
+ private void parse() throws SQLException {
+ StringTokenizer tokenizer = getTokenizerWithoutProtocol();
+
+ // Unlike for the ZK URL, there is no heuristics to figure out missing parts.
+ // Unspecified parts inside the URL must be indicated by ::.
+ boolean wasSeparator = true;
+ boolean first = true;
+ ArrayList<String> parts = new ArrayList<>(7);
+ String token = null;
+ while (tokenizer.hasMoreTokens()
+ && !(token = tokenizer.nextToken()).equals(TERMINATOR)) {
+ if (DELIMITERS.contains(token)) {
+ if (wasSeparator && !first) {
+ parts.add(null);
+ }
+ wasSeparator = true;
+ } else {
+ parts.add(token);
+ wasSeparator = false;
+ }
+ first = false;
+ if (parts.size() > 6) {
+ throw getMalFormedUrlException(url);
+ }
+ }
+
+ if (parts.size() == 6) {
+ // We could check for FileSystems.getDefault().getSeparator()), but then
+ // we wouldn't be able to test on Unix.
+ if (parts.get(5).startsWith("\\")) {
+ // Reconstruct windows path
+ parts.set(4, parts.get(4) + ":" + parts.get(5));
+ parts.remove(5);
+ } else {
+ throw getMalFormedUrlException(url);
+ }
+ }
+
+ while (parts.size() < 7) {
+ parts.add(null);
+ }
+ hostsList = parts.get(0);
+ portString = parts.get(1);
+ if (portString != null) {
+ try {
+ port = Integer.parseInt(parts.get(1));
+ if (port < 0) {
+ throw new Exception();
+ }
+ } catch (Exception e) {
+ throw getMalFormedUrlException(url);
+ }
+ }
+ if (parts.get(2) != null && !parts.get(2).isEmpty()) {
+ // This MUST be empty
+ throw getMalFormedUrlException(url);
+ }
+ principal = parts.get(3);
+ keytab = parts.get(4);
+ }
+
+ protected void normalizeMaster() throws SQLException {
+ if (hostsList != null && hostsList.isEmpty()) {
+ hostsList = null;
+ }
+ if (portString != null && portString.isEmpty()) {
+ portString = null;
+ }
+ if (portString != null) {
+ try {
+ port = Integer.parseInt(portString);
+ if (port < 0) {
+ throw new Exception();
+ }
+ } catch (Exception e) {
+ throw getMalFormedUrlException(url);
+ }
+ }
+
+ if (port == null) {
+ port = getDefaultMasterPort();
+ }
+ // At this point, masterPort is guaranteed not to be 0
+
+ if (isConnectionless) {
+ // We probably don't create connectionless MasterConnectionInfo objects
+ if (hostsList != null || port != null) {
+ throw getMalFormedUrlException(url);
+ } else {
+ return;
+ }
+ }
+
+ if (hostsList == null) {
+ hostsList = getMasterAddr(port);
+ if (hostsList == null) {
+ throw getMalFormedUrlException(
+ "Hbase masters are not specified and in URL, and are not set in the configuration files: "
+ + url);
+ }
+ } else {
+ hostsList = hostsList.replaceAll("=", ":");
+ }
+
+ hostsList = normalizeHostsList(hostsList, port);
+ }
+
+ /**
+ * Copied from org.apache.hadoop.hbase.client.MasterRegistry (which is private) Supplies the
+ * default master port we should use given the provided configuration.
+ * @param conf Configuration to parse from.
+ */
+ private int getDefaultMasterPort() {
+ String portString = get(HConstants.MASTER_PORT);
+ if (portString == null) {
+ port = HConstants.DEFAULT_MASTER_PORT;
+ } else {
+ port = Integer.parseInt(portString);
+ }
+ if (port == 0) {
+ // Master port may be set to 0. We should substitute the default port in that case.
+ return HConstants.DEFAULT_MASTER_PORT;
+ }
+ return port;
+ }
+
+ /**
+ * Adopted from org.apache.hadoop.hbase.client.MasterRegistry Builds the default master
+ * address end point if it is not specified in the configuration.
+ */
+ private String getMasterAddr(int port) {
+ String masterAddrFromConf = get(MASTER_ADDRS_KEY);
+ if (!Strings.isNullOrEmpty(masterAddrFromConf)) {
+ return masterAddrFromConf;
+ }
+ String hostname = get(MASTER_HOSTNAME_KEY);
+ if (hostname != null) {
+ return String.format("%s:%d", hostname, port);
+ } else {
+ return null;
+ }
+ }
+
+ protected abstract ConnectionInfo build();
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
index 348cae0282..aa28c6e85f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
@@ -93,6 +93,7 @@ public class ClusterRoleRecord {
@JsonProperty("version") long version) {
this.haGroupName = haGroupName;
this.policy = policy;
+ //Do we really need to normalize here ?
zk1 = JDBCUtil.formatZookeeperUrl(zk1);
zk2 = JDBCUtil.formatZookeeperUrl(zk2);
// Ignore the given order of url1 and url2
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ConnectionInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ConnectionInfo.java
new file mode 100644
index 0000000000..b982b1d6a1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ConnectionInfo.java
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to encapsulate connection info for HBase
+ * @since 0.1.1
+ */
+public abstract class ConnectionInfo {
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(ConnectionInfo.class);
+ protected static final Object KERBEROS_LOGIN_LOCK = new Object();
+ protected static final char WINDOWS_SEPARATOR_CHAR = '\\';
+ protected static final String REALM_EQUIVALENCY_WARNING_MSG =
+ "Provided principal does not contain a realm and the default realm cannot be"
+ + " determined. Ignoring realm equivalency check.";
+ protected static final String TERMINATOR = "" + PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+ protected static final String DELIMITERS = TERMINATOR + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+ protected static final String CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY =
+ "hbase.client.registry.impl";
+
+ protected static final boolean HAS_MASTER_REGISTRY;
+ protected static final boolean HAS_RPC_REGISTRY;
+
+ static {
+ String version = VersionInfo.getVersion();
+ if (VersionInfo.getMajorVersion(version) >= 3) {
+ HAS_MASTER_REGISTRY = true;
+ HAS_RPC_REGISTRY = true;
+ } else {
+ if (VersionInfo.compareVersion(VersionInfo.getVersion(), "2.3.0") < 0) {
+ HAS_MASTER_REGISTRY = false;
+ HAS_RPC_REGISTRY = false;
+ } else if (VersionInfo.compareVersion(VersionInfo.getVersion(), "2.5.0") < 0) {
+ HAS_MASTER_REGISTRY = true;
+ HAS_RPC_REGISTRY = false;
+ } else {
+ HAS_MASTER_REGISTRY = true;
+ HAS_RPC_REGISTRY = true;
+ }
+ }
+ }
+
+ protected static SQLException getMalFormedUrlException(String url) {
+ return new SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL)
+ .setMessage(url).build().buildException();
+ }
+
+ protected final boolean isConnectionless;
+ protected final String principal;
+ protected final String keytab;
+ protected final User user;
+ protected final String haGroup;
+
+ protected ConnectionInfo(boolean isConnectionless, String principal, String keytab, User user,
+ String haGroup) {
+ super();
+ this.isConnectionless = isConnectionless;
+ this.principal = principal;
+ this.keytab = keytab;
+ this.user = user;
+ this.haGroup = haGroup;
+ }
+
+ protected static String unescape(String escaped) {
+ return escaped.replaceAll("\\\\:", "=");
+ }
+
+ public static ConnectionInfo createNoLogin(String url, ReadOnlyProps props, Properties info)
+ throws SQLException {
+ Configuration conf = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ return create(url, conf, props, info, true);
+ }
+
+ public static ConnectionInfo create(String url, ReadOnlyProps props, Properties info)
+ throws SQLException {
+ Configuration conf = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ return create(url, conf, props, info);
+ }
+
+ public static ConnectionInfo createNoLogin(String url, Configuration configuration,
+ ReadOnlyProps props, Properties info) throws SQLException {
+ return create(url, configuration, props, info, true);
+ }
+
+ public static ConnectionInfo create(String url, Configuration configuration,
+ ReadOnlyProps props, Properties info) throws SQLException {
+ return create(url, configuration, props, info, false);
+ }
+
+ public static ConnectionInfo create(String url, Configuration configuration,
+ ReadOnlyProps props, Properties info, boolean doNotLogin) throws SQLException {
+ // registry-independent URL preprocessing
+ url = url == null ? "" : url;
+ url = unescape(url);
+
+ // Assume missing prefix
+ if (url.isEmpty()) {
+ url = PhoenixRuntime.JDBC_PROTOCOL;
+ }
+ if (!url.startsWith(PhoenixRuntime.JDBC_PROTOCOL)) {
+ url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + url;
+ }
+
+ if (configuration == null) {
+ configuration = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ }
+
+ Builder builder;
+
+ if (url.toLowerCase().startsWith(PhoenixRuntime.JDBC_PROTOCOL_ZK)) {
+ builder = new ZKConnectionInfo.Builder(url, configuration, props, info);
+ } else if (url.toLowerCase().startsWith(PhoenixRuntime.JDBC_PROTOCOL_MASTER)) {
+ builder = new MasterConnectionInfo.Builder(url, configuration, props, info);
+ } else if (url.toLowerCase().startsWith(PhoenixRuntime.JDBC_PROTOCOL_RPC)) {
+ builder = new RPCConnectionInfo.Builder(url, configuration, props, info);
+ } else if (url.toLowerCase().startsWith(PhoenixRuntime.JDBC_PROTOCOL)) {
+ // The generic protocol was specified. Try to Determine the protocol from the config
+ if (MasterConnectionInfo.isMaster(configuration)) {
+ builder = new MasterConnectionInfo.Builder(url, configuration, props, info);
+ } else if (RPCConnectionInfo.isRPC(configuration)) {
+ builder = new RPCConnectionInfo.Builder(url, configuration, props, info);
+ } else if (ZKConnectionInfo.isZK(configuration)) {
+ builder = new ZKConnectionInfo.Builder(url, configuration, props, info);
+ } else {
+ // No registry class set in config. Use version-dependent default
+ if (VersionInfo.getMajorVersion(VersionInfo.getVersion()) >= 3) {
+ builder = new RPCConnectionInfo.Builder(url, configuration, props, info);
+ } else {
+ builder = new ZKConnectionInfo.Builder(url, configuration, props, info);
+ }
+ }
+ } else {
+ throw getMalFormedUrlException(url);
+ }
+
+ builder.setDoNotLogin(doNotLogin);
+ return builder.create();
+ }
+
+ protected static List<String> handleWindowsKeytab(String url, List<String> parts)
+ throws SQLException {
+
+ if (parts.size() == 7) {
+ // We could check for FileSystems.getDefault().getSeparator()), but then
+ // we wouldn't be able to test on Unix.
+ if (parts.get(6) != null && parts.get(6).startsWith("\\")) {
+ // Reconstruct windows path
+ parts.set(5, parts.get(5) + ":" + parts.get(6));
+ parts.remove(6);
+ } else {
+ throw getMalFormedUrlException(url);
+ }
+ }
+
+ return parts;
+ }
+
+ // Visible for testing
+ static boolean isSameName(String currentName, String newName) throws IOException {
+ return isSameName(currentName, newName, null, getDefaultKerberosRealm());
+ }
+
+ /**
+ * Computes the default kerberos realm if one is available. If one cannot be computed, null is
+ * returned.
+ * @return The default kerberos realm, or null.
+ */
+ static String getDefaultKerberosRealm() {
+ try {
+ return KerberosUtil.getDefaultRealm();
+ } catch (Exception e) {
+ if (LOGGER.isDebugEnabled()) {
+ // Include the stacktrace at DEBUG
+ LOGGER.debug(REALM_EQUIVALENCY_WARNING_MSG, e);
+ } else {
+ // Limit the content at WARN
+ LOGGER.warn(REALM_EQUIVALENCY_WARNING_MSG);
+ }
+ }
+ return null;
+ }
+
+ static boolean isSameName(String currentName, String newName, String hostname)
+ throws IOException {
+ return isSameName(currentName, newName, hostname, getDefaultKerberosRealm());
+ }
+
+ static boolean isSameName(String currentName, String newName, String hostname,
+ String defaultRealm) throws IOException {
+ final boolean newNameContainsRealm = newName.indexOf('@') != -1;
+ // Make sure to replace "_HOST" if it exists before comparing the principals.
+ if (newName.contains(org.apache.hadoop.security.SecurityUtil.HOSTNAME_PATTERN)) {
+ if (newNameContainsRealm) {
+ newName =
+ org.apache.hadoop.security.SecurityUtil.getServerPrincipal(newName,
+ hostname);
+ } else {
+ // If the principal ends with "/_HOST", replace "_HOST" with the hostname.
+ if (newName.endsWith("/_HOST")) {
+ newName = newName.substring(0, newName.length() - 5) + hostname;
+ }
+ }
+ }
+ // The new name doesn't contain a realm and we could compute a default realm
+ if (!newNameContainsRealm && defaultRealm != null) {
+ return currentName.equals(newName + "@" + defaultRealm);
+ }
+ // We expect both names to contain a realm, so we can do a simple equality check
+ return currentName.equals(newName);
+ }
+
+ /**
+ * Create a new Configuration object that merges the CQS properties and the Connection
+ * properties into the HBase configuration object
+ * @param props CQS properties
+ * @param info JDBC connection properties
+ * @return merged configuration
+ */
+ protected static Configuration mergeConfiguration(Configuration configIn, ReadOnlyProps props,
+ Properties info) {
+ // TODO is cloning the configuration a performance problem ?
+ Configuration config;
+ if (configIn != null) {
+ config = new Configuration(configIn);
+ } else {
+ // props/info contains everything
+ config = new Configuration(false);
+ }
+ // Add QueryServices properties
+ if (props != null) {
+ for (Entry<String, String> entry : props) {
+ config.set(entry.getKey(), entry.getValue());
+ }
+ }
+ // Add any user-provided properties (via DriverManager)
+ if (info != null) {
+ for (Object key : info.keySet()) {
+ config.set((String) key, info.getProperty((String) key));
+ }
+ }
+ return config;
+ }
+
+ protected Map<String, String> getCommonProps() {
+ Map<String, String> connectionProps = new HashMap<>();
+ if (getPrincipal() != null && getKeytab() != null) {
+ connectionProps.put(QueryServices.HBASE_CLIENT_PRINCIPAL, getPrincipal());
+ connectionProps.put(QueryServices.HBASE_CLIENT_KEYTAB, getKeytab());
+ }
+ return connectionProps;
+ }
+
+ public abstract ReadOnlyProps asProps();
+
+ public boolean isConnectionless() {
+ return isConnectionless;
+ }
+
+ public String getKeytab() {
+ return keytab;
+ }
+
+ public String getPrincipal() {
+ return principal;
+ }
+
+ public User getUser() {
+ return user;
+ }
+
+ public String getHaGroup() {
+ return haGroup;
+ }
+
+ public abstract String toUrl();
+
+ public abstract String getZookeeperConnectionString();
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ ConnectionInfo other = (ConnectionInfo) obj;
+ // `user` is guaranteed to be non-null
+ if (!other.user.equals(user)) return false;
+ if (principal == null) {
+ if (other.principal != null) return false;
+ } else if (!principal.equals(other.principal)) return false;
+ if (keytab == null) {
+ if (other.keytab != null) return false;
+ } else if (!keytab.equals(other.keytab)) return false;
+ if (haGroup == null) {
+ if (other.haGroup != null) return false;
+ } else if (!haGroup.equals(other.haGroup)) return false;
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((principal == null) ? 0 : principal.hashCode());
+ result = prime * result + ((keytab == null) ? 0 : keytab.hashCode());
+ result = prime * result + ((haGroup == null) ? 0 : haGroup.hashCode());
+ // `user` is guaranteed to be non-null
+ result = prime * result + user.hashCode();
+ return result;
+ }
+
+ protected boolean anyNotNull(Object... params) {
+ for (Object param : params) {
+ if (param != null) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Parent of the Builder classes for the immutable ConnectionInfo classes
+ *
+ * @since
+ */
+ protected abstract static class Builder {
+
+ protected boolean isConnectionless;
+ protected String principal;
+ protected String keytab;
+ protected User user;
+ protected String haGroup;
+ protected boolean doNotLogin = false;
+
+ // Only used for building, not part of ConnectionInfo
+ protected final String url;
+ protected final Configuration config;
+ protected final ReadOnlyProps props;
+ protected final Properties info;
+
+ public Builder(String url, Configuration config, ReadOnlyProps props, Properties info) {
+ this.config = config;
+ this.url = url;
+ this.props = props;
+ this.info = info;
+ }
+
+ protected abstract ConnectionInfo create() throws SQLException;
+
+ protected abstract void normalize() throws SQLException;
+
+ protected String get(String key, String defValue) {
+ String result = null;
+ if (info != null) {
+ result = info.getProperty(key);
+ }
+ if (result == null) {
+ if (props != null) {
+ result = props.get(key);
+ }
+ if (result == null) {
+ result = config.get(key, defValue);
+ }
+ }
+ return result;
+ }
+
+ protected String get(String key) {
+ return get(key, null);
+ }
+
+ protected void setHaGroup() {
+ if (info != null) {
+ haGroup = info.getProperty(HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR);
+ }
+ }
+
+ protected void setDoNotLogin(boolean doNotLogin) {
+ this.doNotLogin = doNotLogin;
+ }
+
+ protected void handleKerberosAndLogin() throws SQLException {
+ // Previously we have ignored the kerberos properties defined in hbase-site.xml,
+ // but now we use them
+ try {
+ this.user = User.getCurrent();
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't get the current user!!", e);
+ }
+ if (null == this.user) {
+ throw new RuntimeException("Acquired null user which should never happen");
+ }
+
+ if (isConnectionless) {
+ return;
+ }
+
+ if (principal == null) {
+ principal = get(QueryServices.HBASE_CLIENT_PRINCIPAL);
+ }
+ if (keytab == null) {
+ keytab = get(QueryServices.HBASE_CLIENT_KEYTAB);
+ }
+ if ((principal == null) && (keytab != null)) {
+ throw getMalFormedUrlException(url);
+ }
+ // We allow specifying a principal without a keytab, in which case
+ // the principal is not used for kerberos, but is set as the connection user
+ if (principal != null && keytab != null && !doNotLogin) {
+ // PHOENIX-3189 Because ConnectionInfo is immutable, we must make sure all parts of
+ // it are correct before
+ // construction; this also requires the Kerberos user credentials object (since they
+ // are compared by reference
+ // and not by value. If the user provided a principal and keytab via the JDBC url,
+ // we must make sure that the
+ // Kerberos login happens *before* we construct the ConnectionInfo object.
+ // Otherwise, the use of ConnectionInfo
+ // to determine when ConnectionQueryServices impl's should be reused will be broken.
+ try {
+ // Check if we need to authenticate with kerberos so that we cache the correct
+ // ConnectionInfo
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ if (!currentUser.hasKerberosCredentials()
+ || !isSameName(currentUser.getUserName(), principal)) {
+ synchronized (KERBEROS_LOGIN_LOCK) {
+ // Double check the current user, might have changed since we checked
+ // last. Don't want
+ // to re-login if it's the same user.
+ currentUser = UserGroupInformation.getCurrentUser();
+ if (!currentUser.hasKerberosCredentials()
+ || !isSameName(currentUser.getUserName(), principal)) {
+ LOGGER.info("Trying to connect to a secure cluster as {} "
+ + "with keytab {}",
+ principal, keytab);
+ // We are intentionally changing the passed in Configuration object
+ if (null != principal) {
+ config.set(QueryServices.HBASE_CLIENT_PRINCIPAL, principal);
+ }
+ if (null != keytab) {
+ config.set(QueryServices.HBASE_CLIENT_KEYTAB, keytab);
+ }
+ UserGroupInformation.setConfiguration(config);
+ User.login(config, QueryServices.HBASE_CLIENT_KEYTAB,
+ QueryServices.HBASE_CLIENT_PRINCIPAL, null);
+ user = User.getCurrent();
+ LOGGER.info("Successful login to secure cluster");
+ }
+ }
+ } else {
+ // The user already has Kerberos creds, so there isn't anything to change in
+ // the ConnectionInfo.
+ LOGGER.debug("Already logged in as {}", currentUser);
+ }
+ } catch (IOException e) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
+ .setRootCause(e).build().buildException();
+ }
+ } else {
+ LOGGER.debug("Principal and keytab not provided, not attempting Kerberos login");
+ }
+ }
+
+ protected String normalizeHostsList(String quorum, Integer defaultPort)
+ throws SQLException {
+ // The input host:port separator char is "=" (after unescaping)
+ String[] quorumParts = quorum.split(",");
+ String[] normalizedParts = new String[quorumParts.length];
+ for (int i = 0; i < quorumParts.length; i++) {
+ String[] hostAndPort = quorumParts[i].trim().split(":");
+ if (hostAndPort.length == 1) {
+ normalizedParts[i] = hostAndPort[0].trim().toLowerCase() + ":" + defaultPort;
+ } else if (hostAndPort.length == 2) {
+ normalizedParts[i] = quorumParts[i].trim().toLowerCase();
+ } else {
+ throw getMalFormedUrlException(url);
+ }
+ }
+ // We are sorting the host:port strings, so the sorting result may be unexpected, but
+ // as long as it's consistent, it doesn't matter.
+ Arrays.sort(normalizedParts);
+ return String.join(",", normalizedParts);
+ // TODO
+ // HBase will perform a further reverse lookup based normalization on the hosts,
+ // but we skip that.
+ // In the unlikely worst case, we generate separate CQSI objects instead of sharing them
+ }
+
+ protected StringTokenizer getTokenizerWithoutProtocol() throws SQLException {
+ StringTokenizer tokenizer = new StringTokenizer(url, DELIMITERS, true);
+ try {
+ // Walk the first three tokens "jdbc", ":", "phoenix"/"phoenix+master"/"phoenix-zk"
+ // This should succeed, as we check for the "jdbc:phoenix" prefix when accepting the
+ // URL
+ if (!tokenizer.nextToken().toLowerCase().equals("jdbc")) {
+ throw new Exception();
+ }
+ if (!tokenizer.nextToken().toLowerCase().equals(":")) {
+ throw new Exception();
+ }
+ if (!tokenizer.nextToken().toLowerCase().startsWith("phoenix")) {
+ throw new Exception();
+ }
+ } catch (Exception e) {
+ throw getMalFormedUrlException(url);
+ }
+ return tokenizer;
+ }
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
index 75ac6cba82..fb2137d106 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
@@ -26,7 +26,6 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -357,14 +356,13 @@ public class HighAvailabilityGroup {
jdbcUrl = jdbcUrl.substring(PhoenixRuntime.JDBC_PROTOCOL.length() + 1);
}
Preconditions.checkArgument(!StringUtils.isEmpty(jdbcUrl), "JDBC url is empty!");
- String[] urls = jdbcUrl.split(":");
- if (urls.length == 1) {
- zkUrl = String.format("%s:%s", urls[0], HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
- } else if (urls.length == 2 || urls.length == 3) {
- zkUrl = String.format("%s:%s", urls[0], urls[1]);
- } else {
+ jdbcUrl = jdbcUrl.replaceAll("\\\\:", "=");
+ String[] parts = jdbcUrl.split(":");
+ if (parts.length == 0 || parts.length > 3) {
throw new IllegalArgumentException("Invalid JDBC url!" + jdbcUrl);
}
+ // The URL is already normalised
+ zkUrl = parts[0].replaceAll("=", ":");
// Get timeout and retry counts
String connectionTimeoutMsProp = properties.getProperty(
@@ -568,7 +566,8 @@ public class HighAvailabilityGroup {
if (state != State.READY || connection == null) {
return false;
}
- return roleRecord.getActiveUrl().equals(Optional.of(connection.getURL()));
+ return roleRecord.getActiveUrl()
+ .equals(Optional.of(JDBCUtil.formatZookeeperUrl(connection.getURL())));
}
/**
@@ -583,8 +582,8 @@ public class HighAvailabilityGroup {
if (url.startsWith(PhoenixRuntime.JDBC_PROTOCOL)) {
Preconditions.checkArgument(url.length() > PhoenixRuntime.JDBC_PROTOCOL.length(),
"The URL '" + url + "' is not a valid Phoenix connection string");
- url = JDBCUtil.formatZookeeperUrl(url.substring(PhoenixRuntime.JDBC_PROTOCOL.length() + 1));
}
+ url = JDBCUtil.formatZookeeperUrl(url);
Preconditions.checkArgument(url.equals(info.getUrl1()) || url.equals(info.getUrl2()),
"The URL '" + url + "' does not belong to this HA group " + info);
@@ -603,7 +602,7 @@ public class HighAvailabilityGroup {
Preconditions.checkArgument(driver instanceof PhoenixEmbeddedDriver,
"No JDBC driver is registered for Phoenix high availability (HA) framework");
return ((PhoenixEmbeddedDriver) driver).getConnectionQueryServices(jdbcString, properties)
- .connect(url, properties);
+ .connect(jdbcString, properties);
}
@VisibleForTesting
@@ -790,7 +789,7 @@ public class HighAvailabilityGroup {
Preconditions.checkArgument(zkUrl.equals(getUrl1()) || zkUrl.equals(getUrl2()),
"The URL '" + zkUrl + "' does not belong to this HA group " + this);
StringBuilder sb = new StringBuilder();
- sb.append(PhoenixRuntime.JDBC_PROTOCOL);
+ sb.append(PhoenixRuntime.JDBC_PROTOCOL_ZK);
sb.append(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR);
sb.append(zkUrl);
if (!Strings.isNullOrEmpty(additionalJDBCParams)) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/MasterConnectionInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/MasterConnectionInfo.java
new file mode 100644
index 0000000000..eef75e3402
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/MasterConnectionInfo.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+
+/**
+ * ConnectionInfo used for org.apache.hadoop.hbase.client.MasterRegistry
+ *
+ */
+public class MasterConnectionInfo extends AbstractRPCConnectionInfo {
+
+ private static final String MASTER_REGISTRY_CLASS_NAME =
+ "org.apache.hadoop.hbase.client.MasterRegistry";
+
+ protected MasterConnectionInfo(boolean isConnectionless, String principal, String keytab,
+ User user, String haGroup, String bootstrapServers) {
+ super(isConnectionless, principal, keytab, user, haGroup);
+ this.bootstrapServers = bootstrapServers;
+ }
+
+ @Override
+ public ReadOnlyProps asProps() {
+ if (isConnectionless) {
+ return ReadOnlyProps.EMPTY_PROPS;
+ }
+
+ Map<String, String> connectionProps = getCommonProps();
+ connectionProps.put(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+ MASTER_REGISTRY_CLASS_NAME);
+
+ if (bootstrapServers != null) {
+ // This is already normalized to include ports
+ connectionProps.put(HConstants.MASTER_ADDRS_KEY, bootstrapServers);
+ }
+
+ return connectionProps.isEmpty() ? ReadOnlyProps.EMPTY_PROPS
+ : new ReadOnlyProps(connectionProps.entrySet().iterator());
+ }
+
+ @Override
+ public String toUrl() {
+ return PhoenixRuntime.JDBC_PROTOCOL_MASTER + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR
+ + toString();
+ }
+
+ public static boolean isMaster(Configuration config) {
+ // Default is handled by the caller
+ return config != null && MASTER_REGISTRY_CLASS_NAME
+ .equals(config.get(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY));
+ }
+
+ /**
+ * Builder class for MasterConnectionInfo
+ *
+ * @since 138
+ */
+ protected static class Builder extends AbstractRPCConnectionInfo.Builder {
+
+ public Builder(String url, Configuration config, ReadOnlyProps props, Properties info)
+ throws SQLException {
+ super(url, config, props, info);
+ if (!HAS_MASTER_REGISTRY) {
+ throw getMalFormedUrlException(
+ "HBase version does not support Master registry for: " + url);
+ }
+ }
+
+ @Override
+ protected void normalize() throws SQLException {
+ normalizeMaster();
+ }
+
+ @Override
+ protected ConnectionInfo build() {
+ return new MasterConnectionInfo(isConnectionless, principal, keytab, user, haGroup,
+ hostsList);
+ }
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java
index 276b5128df..3184af7adf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java
@@ -69,13 +69,13 @@ public class ParallelPhoenixConnection implements PhoenixMonitoredConnection {
CompletableFuture<PhoenixConnection> futureConnection2;
public ParallelPhoenixConnection(ParallelPhoenixContext context) throws SQLException {
this.context = context;
- LOG.trace("First Url: {} Second Url: {}", context.getHaGroup().getGroupInfo().getUrl1(),
- context.getHaGroup().getGroupInfo().getUrl2());
+ LOG.trace("First Url: {} Second Url: {}", context.getHaGroup().getGroupInfo().getJDBCUrl1(),
+ context.getHaGroup().getGroupInfo().getJDBCUrl2());
futureConnection1 = context.chainOnConn1(() -> getConnection(context.getHaGroup(),
- context.getHaGroup().getGroupInfo().getUrl1(),
+ context.getHaGroup().getGroupInfo().getJDBCUrl1(),
context.getProperties()));
futureConnection2 = context.chainOnConn2(() -> getConnection(context.getHaGroup(),
- context.getHaGroup().getGroupInfo().getUrl2(),
+ context.getHaGroup().getGroupInfo().getJDBCUrl2(),
context.getProperties()));
// Ensure one connection is successful before returning
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixContext.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixContext.java
index 8df87493b8..567abad2dd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixContext.java
@@ -116,6 +116,7 @@ public class ParallelPhoenixContext {
}
public Properties getProperties() {
+ //FIXME should return immutable
return properties;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
index e67f0d6817..ca412d5238 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
@@ -48,6 +48,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.phoenix.util.PropertiesUtil;
/**
@@ -197,7 +198,7 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
if (result == null) {
synchronized(this) {
result = services;
- if(result == null) {
+ if (result == null) {
services = result = new QueryServicesImpl(getDefaultProps());
}
}
@@ -239,33 +240,34 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
}
@Override
- protected ConnectionQueryServices getConnectionQueryServices(String url, final Properties info) throws SQLException {
+ protected ConnectionQueryServices getConnectionQueryServices(String url, final Properties infoIn) throws SQLException {
lockInterruptibly(LockMode.READ);
try {
checkClosed();
- ConnectionInfo connInfo = ConnectionInfo.create(url);
SQLException sqlE = null;
boolean success = false;
final QueryServices services = getQueryServices();
ConnectionQueryServices connectionQueryServices = null;
// Also performs the Kerberos login if the URL/properties request this
- final ConnectionInfo normalizedConnInfo = connInfo.normalize(services.getProps(), info);
+ final Properties info = PropertiesUtil.deepCopy(infoIn);
+ final ConnectionInfo connInfo = ConnectionInfo.create(url, services.getProps(), info);
+ //Set connection parameters to normalized value from URL
+ info.putAll(connInfo.asProps().asMap());
try {
connectionQueryServices =
- connectionQueryServicesCache.get(normalizedConnInfo, new Callable<ConnectionQueryServices>() {
+ connectionQueryServicesCache.get(connInfo, new Callable<ConnectionQueryServices>() {
@Override
public ConnectionQueryServices call() throws Exception {
ConnectionQueryServices connectionQueryServices;
- if (normalizedConnInfo.isConnectionless()) {
- connectionQueryServices = new ConnectionlessQueryServicesImpl(services, normalizedConnInfo, info);
+ if (connInfo.isConnectionless()) {
+ connectionQueryServices = new ConnectionlessQueryServicesImpl(services, connInfo, info);
} else {
- connectionQueryServices = new ConnectionQueryServicesImpl(services, normalizedConnInfo, info);
+ connectionQueryServices = new ConnectionQueryServicesImpl(services, connInfo, info);
}
return connectionQueryServices;
}
});
-
connectionQueryServices.init(url, info);
success = true;
} catch (ExecutionException ee){
@@ -281,7 +283,7 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
finally {
if (!success) {
// Remove from map, as initialization failed
- connectionQueryServicesCache.invalidate(normalizedConnInfo);
+ connectionQueryServicesCache.invalidate(connInfo);
if (sqlE != null) {
throw sqlE;
}
@@ -313,8 +315,7 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
* @throws SQLException if fails to generate key for CQS to invalidate
*/
void invalidateCache(String url, Properties properties) throws SQLException {
- ConnectionInfo connInfo = ConnectionInfo.create(url)
- .normalize(getQueryServices().getProps(), properties);
+ ConnectionInfo connInfo = ConnectionInfo.create(url, getQueryServices().getProps(), properties);
LOGGER.info("Invalidating the CQS from cache for connInfo={}", connInfo);
connectionQueryServicesCache.invalidate(connInfo);
LOGGER.debug(connectionQueryServicesCache.asMap().keySet().stream().map(Objects::toString).collect(Collectors.joining(",")));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
index d97e88844c..46f499f28d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
@@ -19,40 +19,25 @@ package org.apache.phoenix.jdbc;
import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
-import java.io.IOException;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverPropertyInfo;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
-import java.util.Map;
import java.util.Optional;
import java.util.Properties;
-import java.util.StringTokenizer;
-import java.util.Map.Entry;
import java.util.logging.Logger;
import javax.annotation.concurrent.Immutable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.query.ConnectionQueryServices;
-import org.apache.phoenix.query.HBaseFactoryProvider;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SQLCloseable;
-import org.slf4j.LoggerFactory;
-
-import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableMap;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
@@ -70,11 +55,13 @@ public abstract class PhoenixEmbeddedDriver implements Driver, SQLCloseable {
*/
private final static String DNC_JDBC_PROTOCOL_SUFFIX = "//";
private final static String DRIVER_NAME = "PhoenixEmbeddedDriver";
- private static final String TERMINATOR = "" + PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
- private static final String DELIMITERS = TERMINATOR + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
private static final String TEST_URL_AT_END = "" + PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
private static final String TEST_URL_IN_MIDDLE = TEST_URL_AT_END + PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+ private static final String[] SUPPORTED_PROTOCOLS =
+ new String[] { PhoenixRuntime.JDBC_PROTOCOL, PhoenixRuntime.JDBC_PROTOCOL_ZK,
+ PhoenixRuntime.JDBC_PROTOCOL_MASTER, PhoenixRuntime.JDBC_PROTOCOL_RPC };
+
private final static DriverPropertyInfo[] EMPTY_INFO = new DriverPropertyInfo[0];
public final static String MAJOR_VERSION_PROP = "DriverMajorVersion";
public final static String MINOR_VERSION_PROP = "DriverMinorVersion";
@@ -85,7 +72,7 @@ public abstract class PhoenixEmbeddedDriver implements Driver, SQLCloseable {
MAJOR_VERSION_PROP, Integer.toString(MetaDataProtocol.PHOENIX_MAJOR_VERSION),
MINOR_VERSION_PROP, Integer.toString(MetaDataProtocol.PHOENIX_MINOR_VERSION),
DRIVER_NAME_PROP, DRIVER_NAME));
-
+
PhoenixEmbeddedDriver() {
}
@@ -98,33 +85,38 @@ public abstract class PhoenixEmbeddedDriver implements Driver, SQLCloseable {
@Override
public boolean acceptsURL(String url) throws SQLException {
if (url.startsWith(PhoenixRuntime.JDBC_PROTOCOL)) {
- // A connection string of "jdbc:phoenix" is supported, since
- // all the connection information can potentially be gotten
- // out of the HBase config file
- if (url.length() == PhoenixRuntime.JDBC_PROTOCOL.length()) {
- return true;
- }
- // Same as above, except for "jdbc:phoenix;prop=<value>..."
- if (PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR == url.charAt(PhoenixRuntime.JDBC_PROTOCOL.length())) {
- return true;
- }
- if (PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR == url.charAt(PhoenixRuntime.JDBC_PROTOCOL.length())) {
- int protoLength = PhoenixRuntime.JDBC_PROTOCOL.length() + 1;
- // A connection string of "jdbc:phoenix:" matches this driver,
- // but will end up as a MALFORMED_CONNECTION_URL exception later.
- if (url.length() == protoLength) {
- return true;
+ for (String protocol : SUPPORTED_PROTOCOLS) {
+ // A connection string of "jdbc:phoenix" is supported, since
+ // all the connection information can potentially be gotten
+ // out of the HBase config file
+ if (!url.startsWith(protocol)) {
+ continue;
}
- // Explicitly ignore connections of "jdbc:phoenix:thin"; leave them for
- // the thin client
- if (url.startsWith(PhoenixRuntime.JDBC_THIN_PROTOCOL)) {
- return false;
+ if (url.length() == protocol.length()) {
+ return true;
}
- // A connection string of the form "jdbc:phoenix://" means that
- // the driver is remote which isn't supported, so return false.
- if (!url.startsWith(DNC_JDBC_PROTOCOL_SUFFIX, protoLength)) {
+ // Same as above, except for "jdbc:phoenix;prop=<value>..."
+ if (PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR == url.charAt(protocol.length())) {
return true;
}
+ if (PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR == url.charAt(protocol.length())) {
+ int protoLength = protocol.length() + 1;
+ // A connection string of "jdbc:phoenix:" matches this driver,
+ // but will end up as a MALFORMED_CONNECTION_URL exception later.
+ if (url.length() == protoLength) {
+ return true;
+ }
+ // Explicitly ignore connections of "jdbc:phoenix:thin"; leave them for
+ // the thin client
+ if (url.startsWith(PhoenixRuntime.JDBC_THIN_PROTOCOL)) {
+ return false;
+ }
+ // A connection string of the form "jdbc:phoenix://" means that
+ // the driver is remote which isn't supported, so return false.
+ if (!url.startsWith(DNC_JDBC_PROTOCOL_SUFFIX, protoLength)) {
+ return true;
+ }
+ }
}
}
return false;
@@ -140,18 +132,19 @@ public abstract class PhoenixEmbeddedDriver implements Driver, SQLCloseable {
}
protected final Connection createConnection(String url, Properties info) throws SQLException {
- Properties augmentedInfo = PropertiesUtil.deepCopy(info);
- augmentedInfo.putAll(getDefaultProps().asMap());
+ Properties augmentedInfo = PropertiesUtil.deepCopy(info);
+ augmentedInfo.putAll(getDefaultProps().asMap());
if (url.contains("|")) {
// High availability connection using two clusters
Optional<HighAvailabilityGroup> haGroup = HighAvailabilityGroup.get(url, augmentedInfo);
- if(haGroup.isPresent()){
+ if (haGroup.isPresent()) {
return haGroup.get().connect(augmentedInfo);
} else {
// If empty HA group is returned, fall back to single cluster.
- url = HighAvailabilityGroup.getFallbackCluster(url, info)
- .orElseThrow(() -> new SQLException(
- "HA group can not be initialized, not fallback to single cluster"));
+ url =
+ HighAvailabilityGroup.getFallbackCluster(url, info).orElseThrow(
+ () -> new SQLException(
+ "HA group can not be initialized, fallback to single cluster"));
}
}
ConnectionQueryServices cqs = getConnectionQueryServices(url, augmentedInfo);
@@ -198,485 +191,6 @@ public abstract class PhoenixEmbeddedDriver implements Driver, SQLCloseable {
public void close() throws SQLException {
}
- /**
- *
- * Class to encapsulate connection info for HBase
- *
- *
- * @since 0.1.1
- */
- public static class ConnectionInfo {
- private static final org.slf4j.Logger LOGGER =
- LoggerFactory.getLogger(ConnectionInfo.class);
- private static final Object KERBEROS_LOGIN_LOCK = new Object();
- private static final char WINDOWS_SEPARATOR_CHAR = '\\';
- private static final String REALM_EQUIVALENCY_WARNING_MSG = "Provided principal does not contan a realm and the default realm cannot be determined. Ignoring realm equivalency check.";
- private static SQLException getMalFormedUrlException(String url) {
- return new SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL)
- .setMessage(url).build().buildException();
- }
-
- public String getZookeeperConnectionString() {
- return getZookeeperQuorum() + ":" + getPort();
- }
-
- /**
- * Detect url with quorum:1,quorum:2 as HBase does not handle different port numbers
- * for different quorum hostnames.
- * @param portStr
- * @return
- */
- private static boolean isMultiPortUrl(String portStr) {
- int commaIndex = portStr.indexOf(',');
- if (commaIndex > 0) {
- try {
- Integer.parseInt(portStr.substring(0, commaIndex));
- return true;
- } catch (NumberFormatException otherE) {
- }
- }
- return false;
- }
-
- public static ConnectionInfo create(String url) throws SQLException {
- url = url == null ? "" : url;
- if (url.isEmpty() || url.equalsIgnoreCase("jdbc:phoenix:")
- || url.equalsIgnoreCase("jdbc:phoenix")) {
- return defaultConnectionInfo(url);
- }
- url = url.startsWith(PhoenixRuntime.JDBC_PROTOCOL)
- ? url.substring(PhoenixRuntime.JDBC_PROTOCOL.length())
- : PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + url;
- StringTokenizer tokenizer = new StringTokenizer(url, DELIMITERS, true);
- int nTokens = 0;
- String[] tokens = new String[5];
- String token = null;
- while (tokenizer.hasMoreTokens() &&
- !(token=tokenizer.nextToken()).equals(TERMINATOR) &&
- tokenizer.hasMoreTokens() && nTokens < tokens.length) {
- token = tokenizer.nextToken();
- // This would mean we have an empty string for a token which is illegal
- if (DELIMITERS.contains(token)) {
- throw getMalFormedUrlException(url);
- }
- tokens[nTokens++] = token;
- }
- // Look-forward to see if the last token is actually the C:\\ path
- if (tokenizer.hasMoreTokens() && !TERMINATOR.equals(token)) {
- String extraToken = tokenizer.nextToken();
- if (WINDOWS_SEPARATOR_CHAR == extraToken.charAt(0)) {
- String prevToken = tokens[nTokens - 1];
- tokens[nTokens - 1] = prevToken + ":" + extraToken;
- if (tokenizer.hasMoreTokens() && !(token=tokenizer.nextToken()).equals(TERMINATOR)) {
- throw getMalFormedUrlException(url);
- }
- } else {
- throw getMalFormedUrlException(url);
- }
- }
- String quorum = null;
- Integer port = null;
- String rootNode = null;
- String principal = null;
- String keytabFile = null;
- int tokenIndex = 0;
- if (nTokens > tokenIndex) {
- quorum = tokens[tokenIndex++]; // Found quorum
- if (nTokens > tokenIndex) {
- try {
- port = Integer.parseInt(tokens[tokenIndex]);
- if (port < 0) {
- throw getMalFormedUrlException(url);
- }
- tokenIndex++; // Found port
- } catch (NumberFormatException e) { // No port information
- if (isMultiPortUrl(tokens[tokenIndex])) {
- throw getMalFormedUrlException(url);
- }
- }
- if (nTokens > tokenIndex) {
- if (tokens[tokenIndex].startsWith("/")) {
- rootNode = tokens[tokenIndex++]; // Found rootNode
- }
- if (nTokens > tokenIndex) {
- principal = tokens[tokenIndex++]; // Found principal
- if (nTokens > tokenIndex) {
- keytabFile = tokens[tokenIndex++]; // Found keytabFile
- // There's still more after, try to see if it's a windows file path
- if (tokenIndex < tokens.length) {
- String nextToken = tokens[tokenIndex++];
- // The next token starts with the directory separator, assume
- // it's still the keytab path.
- if (null != nextToken && WINDOWS_SEPARATOR_CHAR == nextToken.charAt(0)) {
- keytabFile = keytabFile + ":" + nextToken;
- }
- }
- }
- }
- }
- }
- }
- return new ConnectionInfo(quorum,port,rootNode, principal, keytabFile);
- }
-
- public ConnectionInfo normalize(ReadOnlyProps props, Properties info) throws SQLException {
- String zookeeperQuorum = this.getZookeeperQuorum();
- Integer port = this.getPort();
- String rootNode = this.getRootNode();
- String keytab = this.getKeytab();
- String principal = this.getPrincipal();
- // Normalize connInfo so that a url explicitly specifying versus implicitly inheriting
- // the default values will both share the same ConnectionQueryServices.
- if (zookeeperQuorum == null) {
- zookeeperQuorum = props.get(QueryServices.ZOOKEEPER_QUORUM_ATTRIB);
- if (zookeeperQuorum == null) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL)
- .setMessage(this.toString()).build().buildException();
- }
- }
-
- if (port == null) {
- if (!isConnectionless) {
- String portStr = props.get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
- if (portStr != null) {
- try {
- port = Integer.parseInt(portStr);
- } catch (NumberFormatException e) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL)
- .setMessage(this.toString()).build().buildException();
- }
- }
- }
- } else if (isConnectionless) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL)
- .setMessage("Port may not be specified when using the connectionless url \"" + this.toString() + "\"").build().buildException();
- }
- if (rootNode == null) {
- if (!isConnectionless) {
- rootNode = props.get(QueryServices.ZOOKEEPER_ROOT_NODE_ATTRIB);
- }
- } else if (isConnectionless) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL)
- .setMessage("Root node may not be specified when using the connectionless url \"" + this.toString() + "\"").build().buildException();
- }
- if(principal == null){
- if (!isConnectionless) {
- principal = props.get(QueryServices.HBASE_CLIENT_PRINCIPAL);
- }
- }
- if(keytab == null){
- if (!isConnectionless) {
- keytab = props.get(QueryServices.HBASE_CLIENT_KEYTAB);
- }
- }
- if (!isConnectionless()) {
- boolean credsProvidedInUrl = null != principal && null != keytab;
- boolean credsProvidedInProps = info.containsKey(QueryServices.HBASE_CLIENT_PRINCIPAL) && info.containsKey(QueryServices.HBASE_CLIENT_KEYTAB);
- if (credsProvidedInUrl || credsProvidedInProps) {
- // PHOENIX-3189 Because ConnectionInfo is immutable, we must make sure all parts of it are correct before
- // construction; this also requires the Kerberos user credentials object (since they are compared by reference
- // and not by value. If the user provided a principal and keytab via the JDBC url, we must make sure that the
- // Kerberos login happens *before* we construct the ConnectionInfo object. Otherwise, the use of ConnectionInfo
- // to determine when ConnectionQueryServices impl's should be reused will be broken.
- try {
- // Check if we need to authenticate with kerberos so that we cache the correct ConnectionInfo
- UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
- if (!currentUser.hasKerberosCredentials() || !isSameName(currentUser.getUserName(), principal)) {
- synchronized (KERBEROS_LOGIN_LOCK) {
- // Double check the current user, might have changed since we checked last. Don't want
- // to re-login if it's the same user.
- currentUser = UserGroupInformation.getCurrentUser();
- if (!currentUser.hasKerberosCredentials() || !isSameName(currentUser.getUserName(), principal)) {
- final Configuration config = getConfiguration(props, info, principal, keytab);
- LOGGER.info("Trying to connect to a secure cluster as {} " +
- "with keytab {}",
- config.get(QueryServices.HBASE_CLIENT_PRINCIPAL),
- config.get(QueryServices.HBASE_CLIENT_KEYTAB));
- UserGroupInformation.setConfiguration(config);
- User.login(config, QueryServices.HBASE_CLIENT_KEYTAB, QueryServices.HBASE_CLIENT_PRINCIPAL, null);
- LOGGER.info("Successful login to secure cluster");
- }
- }
- } else {
- // The user already has Kerberos creds, so there isn't anything to change in the ConnectionInfo.
- LOGGER.debug("Already logged in as {}", currentUser);
- }
- } catch (IOException e) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
- .setRootCause(e).build().buildException();
- }
- } else {
- LOGGER.debug("Principal and keytab not provided, not attempting Kerberos login");
- }
- } // else, no connection, no need to login
- // Will use the current User from UGI
- String haGroup = info.getProperty(HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR);
- return new ConnectionInfo(zookeeperQuorum, port, rootNode, principal, keytab, haGroup);
-
- }
-
- // Visible for testing
- static boolean isSameName(String currentName, String newName) throws IOException {
- return isSameName(currentName, newName, null, getDefaultKerberosRealm());
- }
-
- /**
- * Computes the default kerberos realm if one is available. If one cannot be computed, null
- * is returned.
- *
- * @return The default kerberos realm, or null.
- */
- static String getDefaultKerberosRealm() {
- try {
- return KerberosUtil.getDefaultRealm();
- } catch (Exception e) {
- if (LOGGER.isDebugEnabled()) {
- // Include the stacktrace at DEBUG
- LOGGER.debug(REALM_EQUIVALENCY_WARNING_MSG, e);
- } else {
- // Limit the content at WARN
- LOGGER.warn(REALM_EQUIVALENCY_WARNING_MSG);
- }
- }
- return null;
- }
-
- static boolean isSameName(String currentName, String newName, String hostname) throws IOException {
- return isSameName(currentName, newName, hostname, getDefaultKerberosRealm());
- }
-
- static boolean isSameName(String currentName, String newName, String hostname, String defaultRealm) throws IOException {
- final boolean newNameContainsRealm = newName.indexOf('@') != -1;
- // Make sure to replace "_HOST" if it exists before comparing the principals.
- if (newName.contains(org.apache.hadoop.security.SecurityUtil.HOSTNAME_PATTERN)) {
- if (newNameContainsRealm) {
- newName = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(newName, hostname);
- } else {
- // If the principal ends with "/_HOST", replace "_HOST" with the hostname.
- if (newName.endsWith("/_HOST")) {
- newName = newName.substring(0, newName.length() - 5) + hostname;
- }
- }
- }
- // The new name doesn't contain a realm and we could compute a default realm
- if (!newNameContainsRealm && defaultRealm != null) {
- return currentName.equals(newName + "@" + defaultRealm);
- }
- // We expect both names to contain a realm, so we can do a simple equality check
- return currentName.equals(newName);
- }
-
- /**
- * Constructs a Configuration object to use when performing a Kerberos login.
- * @param props QueryServices properties
- * @param info User-provided properties
- * @param principal Kerberos user principal
- * @param keytab Path to Kerberos user keytab
- * @return Configuration object suitable for Kerberos login
- */
- private Configuration getConfiguration(ReadOnlyProps props, Properties info, String principal, String keytab) {
- final Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
- // Add QueryServices properties
- for (Entry<String,String> entry : props) {
- config.set(entry.getKey(), entry.getValue());
- }
- // Add any user-provided properties (via DriverManager)
- if (info != null) {
- for (Object key : info.keySet()) {
- config.set((String) key, info.getProperty((String) key));
- }
- }
- // Set the principal and keytab if provided from the URL (overriding those provided in Properties)
- if (null != principal) {
- config.set(QueryServices.HBASE_CLIENT_PRINCIPAL, principal);
- }
- if (null != keytab) {
- config.set(QueryServices.HBASE_CLIENT_KEYTAB, keytab);
- }
- return config;
- }
-
- private final Integer port;
- private final String rootNode;
- private final String zookeeperQuorum;
- private final boolean isConnectionless;
- private final String principal;
- private final String keytab;
- private final User user;
- private final String haGroup;
-
- public ConnectionInfo(String zookeeperQuorum, Integer port, String rootNode, String principal, String keytab, String haGroup) {
- this.zookeeperQuorum = zookeeperQuorum;
- this.port = port;
- this.rootNode = rootNode;
- this.isConnectionless = PhoenixRuntime.CONNECTIONLESS.equals(zookeeperQuorum);
- this.principal = principal;
- this.keytab = keytab;
- try {
- this.user = User.getCurrent();
- } catch (IOException e) {
- throw new RuntimeException("Couldn't get the current user!!", e);
- }
- if (null == this.user) {
- throw new RuntimeException("Acquired null user which should never happen");
- }
- this.haGroup = haGroup;
- }
-
- public ConnectionInfo(String zookeeperQuorum, Integer port, String rootNode, String principal, String keytab) {
- this(zookeeperQuorum, port, rootNode, principal, keytab, null);
- }
-
- public ConnectionInfo(String zookeeperQuorum, Integer port, String rootNode) {
- this(zookeeperQuorum, port, rootNode, null, null);
- }
-
- /**
- * Copy constructor for all members except {@link #user}.
- *
- * @param other The instance to copy
- */
- public ConnectionInfo(ConnectionInfo other) {
- this(other.zookeeperQuorum, other.port, other.rootNode, other.principal, other.keytab, other.haGroup);
- }
-
- public ReadOnlyProps asProps() {
- Map<String, String> connectionProps = Maps.newHashMapWithExpectedSize(3);
- if (getZookeeperQuorum() != null) {
- connectionProps.put(QueryServices.ZOOKEEPER_QUORUM_ATTRIB, getZookeeperQuorum());
- }
- if (getPort() != null) {
- connectionProps.put(QueryServices.ZOOKEEPER_PORT_ATTRIB, getPort().toString());
- }
- if (getRootNode() != null) {
- connectionProps.put(QueryServices.ZOOKEEPER_ROOT_NODE_ATTRIB, getRootNode());
- }
- if (getPrincipal() != null && getKeytab() != null) {
- connectionProps.put(QueryServices.HBASE_CLIENT_PRINCIPAL, getPrincipal());
- connectionProps.put(QueryServices.HBASE_CLIENT_KEYTAB, getKeytab());
- }
- if (getHaGroup()!= null) {
- connectionProps.put(QueryServices.HA_GROUP_NAME_ATTRIB, getRootNode());
- }
-
- return connectionProps.isEmpty() ? ReadOnlyProps.EMPTY_PROPS : new ReadOnlyProps(
- connectionProps.entrySet().iterator());
- }
-
- public boolean isConnectionless() {
- return isConnectionless;
- }
-
- public String getZookeeperQuorum() {
- return zookeeperQuorum;
- }
-
- public Integer getPort() {
- return port;
- }
-
- public String getRootNode() {
- return rootNode;
- }
-
- public String getKeytab() {
- return keytab;
- }
-
- public String getPrincipal() {
- return principal;
- }
-
- public User getUser() {
- return user;
- }
-
- public String getHaGroup() {
- return haGroup;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((zookeeperQuorum == null) ? 0 : zookeeperQuorum.hashCode());
- result = prime * result + ((port == null) ? 0 : port.hashCode());
- result = prime * result + ((rootNode == null) ? 0 : rootNode.hashCode());
- result = prime * result + ((principal == null) ? 0 : principal.hashCode());
- result = prime * result + ((keytab == null) ? 0 : keytab.hashCode());
- result = prime * result + ((haGroup == null) ? 0 : haGroup.hashCode());
- // `user` is guaranteed to be non-null
- result = prime * result + user.hashCode();
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) return true;
- if (obj == null) return false;
- if (getClass() != obj.getClass()) return false;
- ConnectionInfo other = (ConnectionInfo) obj;
- // `user` is guaranteed to be non-null
- if (!other.user.equals(user)) return false;
- if (zookeeperQuorum == null) {
- if (other.zookeeperQuorum != null) return false;
- } else if (!zookeeperQuorum.equals(other.zookeeperQuorum)) return false;
- if (port == null) {
- if (other.port != null) return false;
- } else if (!port.equals(other.port)) return false;
- if (rootNode == null) {
- if (other.rootNode != null) return false;
- } else if (!rootNode.equals(other.rootNode)) return false;
- if (principal == null) {
- if (other.principal != null) return false;
- } else if (!principal.equals(other.principal)) return false;
- if (keytab == null) {
- if (other.keytab != null) return false;
- } else if (!keytab.equals(other.keytab)) return false;
- if (haGroup == null) {
- if (other.haGroup != null) return false;
- } else if (!haGroup.equals(other.haGroup)) return false;
- return true;
- }
-
- @Override
- public String toString() {
- return zookeeperQuorum + (port == null ? "" : ":" + port)
- + (rootNode == null ? "" : ":" + rootNode)
- + (principal == null ? "" : ":" + principal)
- + (keytab == null ? "" : ":" + keytab)
- + (haGroup == null ? "" : ":" + haGroup);
-
- }
-
- public String toUrl() {
- // Some tests like PhoenixConfigurationUtilTest add JDBC_PROTOCOL to zookeeper quorum
- if (toString().contains(PhoenixRuntime.JDBC_PROTOCOL)) {
- return toString();
- } else {
- return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR
- + toString();
- }
- }
-
- private static ConnectionInfo defaultConnectionInfo(String url) throws SQLException {
- Configuration config =
- HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
- String quorum = config.get(HConstants.ZOOKEEPER_QUORUM);
- if (quorum == null || quorum.isEmpty()) {
- throw getMalFormedUrlException(url);
- }
- String clientPort = config.get(HConstants.ZOOKEEPER_CLIENT_PORT);
- Integer port = clientPort==null ? null : Integer.parseInt(clientPort);
- if (port == null || port < 0) {
- throw getMalFormedUrlException(url);
- }
- String znodeParent = config.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
- LOGGER.debug("Getting default jdbc connection url "
- + quorum + ":" + port + ":" + znodeParent);
- return new ConnectionInfo(quorum, port, znodeParent);
- }
- }
public static boolean isTestUrl(String url) {
return url.endsWith(TEST_URL_AT_END) || url.contains(TEST_URL_IN_MIDDLE);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
index a47a5aac1f..c6bdadc335 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import org.apache.phoenix.util.JDBCUtil;
import org.apache.phoenix.util.JacksonUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -329,7 +330,7 @@ public class PhoenixHAAdminTool extends Configured implements Tool {
Preconditions.checkNotNull(zkUrl);
Preconditions.checkNotNull(conf);
Preconditions.checkNotNull(highAvailibilityCuratorProvider);
- this.zkUrl = zkUrl;
+ this.zkUrl = JDBCUtil.formatZookeeperUrl(zkUrl);
this.conf = conf;
conf.iterator().forEachRemaining(k -> properties.setProperty(k.getKey(), k.getValue()));
this.highAvailibilityCuratorProvider = highAvailibilityCuratorProvider;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/RPCConnectionInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/RPCConnectionInfo.java
new file mode 100644
index 0000000000..9baeb66b30
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/RPCConnectionInfo.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hbase.thirdparty.com.google.common.base.Strings;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+
+/**
+ * ConnectionInfo class for org.apache.hadoop.hbase.client.RpcConnectionRegistry
+ *
+ * @since 138
+ */
+public class RPCConnectionInfo extends AbstractRPCConnectionInfo {
+
+ // We may be on an older HBase version, which does not even have RpcConnectionRegistry
+ private static final String BOOTSTRAP_NODES = "hbase.client.bootstrap.servers";
+ private static final String RPC_REGISTRY_CLASS_NAME =
+ "org.apache.hadoop.hbase.client.RpcConnectionRegistry";
+
+ protected RPCConnectionInfo(boolean isConnectionless, String principal, String keytab,
+ User user, String haGroup, String bootstrapServers) {
+ super(isConnectionless, principal, keytab, user, haGroup);
+ this.bootstrapServers = bootstrapServers;
+ }
+
+ @Override
+ public ReadOnlyProps asProps() {
+ if (isConnectionless) {
+ return ReadOnlyProps.EMPTY_PROPS;
+ }
+
+ Map<String, String> connectionProps = getCommonProps();
+
+ connectionProps.put(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+ RPC_REGISTRY_CLASS_NAME);
+
+ if (getBoostrapServers() != null) {
+ // This is already normalized to include ports
+ connectionProps.put(HConstants.MASTER_ADDRS_KEY, bootstrapServers);
+ }
+
+ return connectionProps.isEmpty() ? ReadOnlyProps.EMPTY_PROPS
+ : new ReadOnlyProps(connectionProps.entrySet().iterator());
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((bootstrapServers == null) ? 0 : bootstrapServers.hashCode());
+ // Port is already provided in or normalized into bootstrapServers
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!super.equals(obj)) {
+ return false;
+ }
+ RPCConnectionInfo other = (RPCConnectionInfo) obj;
+ if (bootstrapServers == null) {
+ if (other.bootstrapServers != null) {
+ return false;
+ }
+ } else if (!bootstrapServers.equals(other.bootstrapServers)) {
+ return false;
+ }
+ // Port is already provided in or normalized into bootstrapServers
+ return true;
+ }
+
+ @Override
+ public String toUrl() {
+ return PhoenixRuntime.JDBC_PROTOCOL_RPC + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR
+ + toString();
+ }
+
+ public static boolean isRPC(Configuration config) {
+ // Default is handled by the caller
+ return config != null && RPC_REGISTRY_CLASS_NAME
+ .equals(config.get(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY));
+ }
+
+ /**
+ * Builder parent for RPCConnectionInfo.
+ */
+ protected static class Builder extends AbstractRPCConnectionInfo.Builder {
+
+ public Builder(String url, Configuration config, ReadOnlyProps props, Properties info)
+ throws SQLException {
+ super(url, config, props, info);
+ if (!HAS_RPC_REGISTRY) {
+ throw getMalFormedUrlException(
+ "Hbase version does not support Master registry for: " + url);
+ }
+ }
+
+ @Override
+ protected void normalize() throws SQLException {
+ if (hostsList != null && hostsList.isEmpty()) {
+ hostsList = null;
+ }
+ if (portString != null && portString.isEmpty()) {
+ portString = null;
+ }
+
+ // We don't have a default port for RPC Connections
+ // Well, we do if we fall back to Master
+ boolean noServerListinURL = false;
+ if (hostsList == null) {
+ hostsList = getBootstrapServerAddr();
+ noServerListinURL = true;
+ if (hostsList == null) {
+ // Fall back to MasterRegistry behaviour
+ normalizeMaster();
+ return;
+ }
+ } else {
+ hostsList = hostsList.replaceAll("=", ":");
+ }
+
+ if (portString != null) {
+ try {
+ port = Integer.parseInt(portString);
+ if (port < 0) {
+ throw new Exception();
+ }
+ } catch (Exception e) {
+ throw getMalFormedUrlException(url);
+ }
+ }
+
+ if (isConnectionless) {
+ // We probably don't create connectionless MasterConnectionInfo objects
+ if (port != null) {
+ throw getMalFormedUrlException(url);
+ } else {
+ return;
+ }
+ }
+
+ // RpcConnectionRegistry doesn't have a default port property, be we accept the legacy
+ // format
+ // from the URL if both host list and port is provided
+ if (port != null && !noServerListinURL) {
+ hostsList = normalizeHostsList(hostsList, port);
+ }
+ }
+
+ public String getBootstrapServerAddr() {
+ String configuredBootstrapNodes = get(BOOTSTRAP_NODES);
+ if (!Strings.isNullOrEmpty(configuredBootstrapNodes)) {
+ return configuredBootstrapNodes;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ protected ConnectionInfo build() {
+ return new RPCConnectionInfo(isConnectionless, principal, keytab, user, haGroup,
+ hostsList);
+ }
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ZKConnectionInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ZKConnectionInfo.java
new file mode 100644
index 0000000000..28743ba5ee
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/ZKConnectionInfo.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+
+/**
+ * ConnectionInfo class for org.apache.hadoop.hbase.client.ZKConnectionRegistry
+ *
+ * This used to be the only supported Registry in Phoenix (and the only one implemented by HBase)
+ *
+ */
+public class ZKConnectionInfo extends ConnectionInfo {
+
+ private static final String ZK_REGISTRY_NAME =
+ "org.apache.hadoop.hbase.client.ZKConnectionRegistry";
+
+ private final Integer zkPort;
+ private final String zkRootNode;
+ private final String zkHosts;
+
+ private ZKConnectionInfo(boolean isConnectionless, String principal, String keytab, User user,
+ String haGroup, String zkHosts, Integer zkPort, String zkRootNode) {
+ super(isConnectionless, principal, keytab, user, haGroup);
+ this.zkPort = zkPort;
+ this.zkRootNode = zkRootNode;
+ this.zkHosts = zkHosts;
+ }
+
+ public String getZkHosts() {
+ return zkHosts;
+ }
+
+ public Integer getZkPort() {
+ return zkPort;
+ }
+
+ public String getZkRootNode() {
+ return zkRootNode;
+ }
+
+ @Override
+ public String getZookeeperConnectionString() {
+ // Normalized form includes ports
+ return getZkHosts();
+ }
+
+ @Override
+ public ReadOnlyProps asProps() {
+ if (isConnectionless) {
+ return ReadOnlyProps.EMPTY_PROPS;
+ }
+
+ Map<String, String> connectionProps = getCommonProps();
+ connectionProps.put(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+ ZK_REGISTRY_NAME);
+
+ if (getZkHosts() != null) {
+ //This has the highest priority
+ connectionProps.put(HConstants.CLIENT_ZOOKEEPER_QUORUM, getZkHosts());
+ }
+ //Port is already normalized into zkHosts
+ if (getZkRootNode() != null) {
+ connectionProps.put(HConstants.ZOOKEEPER_ZNODE_PARENT, getZkRootNode());
+ }
+ return connectionProps.isEmpty() ? ReadOnlyProps.EMPTY_PROPS
+ : new ReadOnlyProps(connectionProps.entrySet().iterator());
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((zkHosts == null) ? 0 : zkHosts.hashCode());
+ //Port is already included in zkHosts
+ result = prime * result + ((zkRootNode == null) ? 0 : zkRootNode.hashCode());
+ result = prime * result + super.hashCode();
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!super.equals(obj)) {
+ return false;
+ }
+ ZKConnectionInfo other = (ZKConnectionInfo) obj;
+ if (zkHosts == null) {
+ if (other.zkHosts != null) {
+ return false;
+ }
+ } else if (!zkHosts.equals(other.zkHosts)) {
+ return false;
+ }
+ //Port is already normalized into zkHosts
+ if (zkRootNode == null) {
+ if (other.zkRootNode != null) {
+ return false;
+ }
+ } else if (!zkRootNode.equals(other.zkRootNode)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(zkHosts.replaceAll(":", "\\\\:"));
+ if (anyNotNull(zkPort, zkRootNode, principal, keytab)) {
+ sb.append(zkPort == null ? ":" : ":" + zkPort);
+ }
+ if (anyNotNull(zkRootNode, principal, keytab)) {
+ sb.append(zkRootNode == null ? ":" : ":" + zkRootNode);
+ }
+ if (anyNotNull(principal, keytab)) {
+ sb.append(principal == null ? ":" : ":" + principal);
+ }
+ if (anyNotNull(keytab)) {
+ sb.append(keytab == null ? ":" : ":" + keytab);
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public String toUrl() {
+ return PhoenixRuntime.JDBC_PROTOCOL_ZK + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR
+ + toString();
+ }
+
+ /**
+ * Builder helper class for ZKConnectionInfo
+ *
+ */
+ protected static class Builder extends ConnectionInfo.Builder {
+ private Integer zkPort;
+ private String zkRootNode;
+ private String zkHosts;
+
+ public Builder(String url, Configuration config, ReadOnlyProps props, Properties info) {
+ super(url, config, props, info);
+ }
+
+ @Override
+ protected ConnectionInfo create() throws SQLException {
+ parse();
+ normalize();
+ handleKerberosAndLogin();
+ setHaGroup();
+ return build();
+ }
+
+ /**
+ * Detect url with quorum:1,quorum:2 as HBase does not handle different port numbers
+ * for different quorum hostnames.
+ * @param portStr
+ * @return
+ */
+ private boolean isMultiPortUrl(String portStr) {
+ int commaIndex = portStr.indexOf(',');
+ if (commaIndex > 0) {
+ try {
+ Integer.parseInt(portStr.substring(0, commaIndex));
+ return true;
+ } catch (NumberFormatException otherE) {
+ }
+ }
+ return false;
+ }
+
+ private void parse() throws SQLException {
+ StringTokenizer tokenizer = getTokenizerWithoutProtocol();
+ int nTokens = 0;
+ String[] tokens = new String[5];
+ String token = null;
+ boolean wasDelimiter = false;
+ boolean first = true;
+ while (tokenizer.hasMoreTokens() && !(token = tokenizer.nextToken()).equals(TERMINATOR)
+ && nTokens < tokens.length) {
+ // This would mean we have an empty string for a token which is illegal
+ if (DELIMITERS.contains(token)) {
+ if (wasDelimiter && !first) {
+ tokens[nTokens++] = "";
+ }
+ wasDelimiter = true;
+ } else {
+ tokens[nTokens++] = token;
+ wasDelimiter = false;
+ }
+ first = false;
+ }
+ // Look-forward to see if the last token is actually the C:\\ path
+ if (tokenizer.hasMoreTokens() && !TERMINATOR.equals(token)) {
+ String extraToken = tokenizer.nextToken();
+ if (WINDOWS_SEPARATOR_CHAR == extraToken.charAt(0)) {
+ String prevToken = tokens[nTokens - 1];
+ tokens[nTokens - 1] = prevToken + ":" + extraToken;
+ if (tokenizer.hasMoreTokens()
+ && !(token = tokenizer.nextToken()).equals(TERMINATOR)) {
+ throw getMalFormedUrlException(url);
+ }
+ } else {
+ throw getMalFormedUrlException(url);
+ }
+ }
+ int tokenIndex = 0;
+ if (nTokens > tokenIndex) {
+ zkHosts = tokens[tokenIndex++]; // Found quorum
+ if (nTokens > tokenIndex) {
+ try {
+ zkPort = Integer.parseInt(tokens[tokenIndex]);
+ if (zkPort < 0) {
+ throw getMalFormedUrlException(url);
+ }
+ tokenIndex++; // Found port
+ } catch (NumberFormatException e) { // No port information
+ if (tokens[tokenIndex].isEmpty()) {
+ tokenIndex++; // Found empty port
+ }
+ if (isMultiPortUrl(tokens[tokenIndex])) {
+ throw getMalFormedUrlException(url);
+ }
+ // Otherwise assume port is simply omitted
+ }
+ if (nTokens > tokenIndex) {
+ if (tokens[tokenIndex].startsWith("/") || tokens[tokenIndex].isEmpty()) {
+ zkRootNode = tokens[tokenIndex++]; // Found rootNode
+ }
+ if (nTokens > tokenIndex) {
+ principal = tokens[tokenIndex++]; // Found principal
+ if (nTokens > tokenIndex) {
+ keytab = tokens[tokenIndex++]; // Found keytabFile
+ // There's still more after, try to see if it's a windows file path
+ if (tokenIndex < tokens.length) {
+ String nextToken = tokens[tokenIndex++];
+ // The next token starts with the directory separator, assume
+ // it's still the keytab path.
+ if (null != nextToken
+ && WINDOWS_SEPARATOR_CHAR == nextToken.charAt(0)) {
+ keytab = keytab + ":" + nextToken;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ protected ConnectionInfo build() {
+ return new ZKConnectionInfo(isConnectionless, principal, keytab, user, haGroup, zkHosts,
+ zkPort, zkRootNode);
+ }
+
+ @Override
+ protected void normalize() throws SQLException {
+ // Treat empty as null
+ if (zkHosts != null && zkHosts.isEmpty()) {
+ zkHosts = null;
+ }
+ if (zkRootNode != null && zkRootNode.isEmpty()) {
+ zkRootNode = null;
+ }
+ isConnectionless = PhoenixRuntime.CONNECTIONLESS.equals(zkHosts);
+
+ if (isConnectionless) {
+ if (zkPort != null || zkRootNode != null) {
+ throw getMalFormedUrlException(url);
+ } else {
+ return;
+ }
+ }
+
+ // Normalize connInfo so that a url explicitly specifying versus implicitly inheriting
+ // the default values will both share the same ConnectionQueryServices.
+ if (zkPort == null) {
+ String zkPortString = get(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT);
+ if (zkPortString == null) {
+ zkPortString = get(HConstants.ZOOKEEPER_CLIENT_PORT);
+ }
+ if (zkPortString == null) {
+ zkPort = HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT;
+ } else {
+ zkPort = Integer.parseInt(zkPortString);
+ }
+ }
+
+ if (zkHosts == null) {
+ zkHosts = get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
+ if (zkHosts == null) {
+ zkHosts = get(HConstants.ZOOKEEPER_QUORUM);
+ }
+ if (zkHosts == null) {
+ throw getMalFormedUrlException(
+ "Quorum not specified and hbase.client.zookeeper.quorum is not set"
+ + " in configuration : " + url);
+ }
+ } else {
+ zkHosts = zkHosts.replaceAll("=", ":");
+ }
+
+ zkHosts = normalizeHostsList(zkHosts, zkPort);
+ // normalize out zkPort
+ zkPort = null;
+
+ if (zkRootNode == null) {
+ zkRootNode =
+ get(HConstants.ZOOKEEPER_ZNODE_PARENT,
+ HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
+ }
+ }
+ }
+
+ public static boolean isZK(Configuration config) {
+ // Default is handled by the caller
+ return config != null
+ && ZK_REGISTRY_NAME.equals(config.get(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY));
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index 8299baaee3..58eb0bdd20 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -48,9 +48,9 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
+import org.apache.phoenix.jdbc.ConnectionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
import org.apache.phoenix.mapreduce.bulkload.TargetTableRef;
import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
@@ -199,7 +199,9 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
if (cmdLine.hasOption(ZK_QUORUM_OPT.getOpt())) {
// ZK_QUORUM_OPT is optional, but if it's there, use it for both the conn and the job.
String zkQuorum = cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt());
- PhoenixDriver.ConnectionInfo info = PhoenixDriver.ConnectionInfo.create(zkQuorum);
+ ConnectionInfo info =
+ ConnectionInfo.create(PhoenixRuntime.JDBC_PROTOCOL_ZK + ":" + zkQuorum, conf,
+ null, null);
LOGGER.info("Configuring HBase connection to {}", info);
for (Map.Entry<String,String> entry : info.asProps()) {
if (LOGGER.isDebugEnabled()) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
index efd602833c..ccd55fd059 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
@@ -25,92 +25,117 @@ import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
-
/**
* Utility class to return a {@link Connection} .
*/
public class ConnectionUtil {
+ private static String TEST_PARAM =
+ PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR + PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
/**
* Retrieve the configured input Connection.
- *
* @param conf configuration containing connection information
* @return the configured input connection
*/
public static Connection getInputConnection(final Configuration conf) throws SQLException {
return getInputConnection(conf, new Properties());
}
-
+
/**
* Retrieve the configured input Connection.
- *
* @param conf configuration containing connection information
* @param props custom connection properties
* @return the configured input connection
*/
- public static Connection getInputConnection(final Configuration conf , final Properties props) throws SQLException {
+ public static Connection getInputConnection(final Configuration conf, final Properties props)
+ throws SQLException {
Preconditions.checkNotNull(conf);
- return getConnection(PhoenixConfigurationUtil.getInputCluster(conf),
- PhoenixConfigurationUtil.getClientPort(conf),
- PhoenixConfigurationUtil.getZNodeParent(conf),
- PropertiesUtil.combineProperties(props, conf));
+ String zkQuorumOverride = PhoenixConfigurationUtil.getInputClusterZkQuorum(conf);
+ if (zkQuorumOverride != null) {
+ return DriverManager.getConnection("jdbc:phoenix+zk:" + zkQuorumOverride,
+ PropertiesUtil.combineProperties(props, conf));
+ } else {
+ // FIXME find some better way to get tests working
+ String zkQuorumForTest = PhoenixConfigurationUtil.getZKQuorum(conf);
+ if (zkQuorumForTest != null && (zkQuorumForTest.contains(TEST_PARAM)
+ || zkQuorumForTest.equals(PhoenixRuntime.CONNECTIONLESS))) {
+ return DriverManager.getConnection("jdbc:phoenix+zk:" + zkQuorumForTest,
+ PropertiesUtil.combineProperties(props, conf));
+ }
+ return DriverManager.getConnection("jdbc:phoenix",
+ PropertiesUtil.combineProperties(props, conf));
+ }
}
/**
* Create the configured output Connection.
- *
* @param conf configuration containing the connection information
* @return the configured output connection
*/
public static Connection getOutputConnection(final Configuration conf) throws SQLException {
return getOutputConnection(conf, new Properties());
}
-
+
/**
* Create the configured output Connection.
- *
* @param conf configuration containing the connection information
* @return the configured output connection
*/
- public static Connection getOutputConnectionWithoutTheseProps(final Configuration conf, Set<String> ignoreTheseProps) throws SQLException {
+ public static Connection getOutputConnectionWithoutTheseProps(final Configuration conf,
+ Set<String> ignoreTheseProps) throws SQLException {
return getOutputConnection(conf, new Properties(), ignoreTheseProps);
}
-
+
/**
* Create the configured output Connection.
- *
* @param conf configuration containing the connection information
* @param props custom connection properties
* @return the configured output connection
*/
- public static Connection getOutputConnection(final Configuration conf, Properties props) throws SQLException {
+ public static Connection getOutputConnection(final Configuration conf, Properties props)
+ throws SQLException {
return getOutputConnection(conf, props, Collections.<String>emptySet());
}
-
- public static Connection getOutputConnection(final Configuration conf, Properties props, Set<String> withoutTheseProps) throws SQLException {
+
+ public static Connection getOutputConnection(final Configuration conf, Properties props,
+ Set<String> withoutTheseProps) throws SQLException {
Preconditions.checkNotNull(conf);
- return getConnection(PhoenixConfigurationUtil.getOutputCluster(conf),
- PhoenixConfigurationUtil.getClientPort(conf),
- PhoenixConfigurationUtil.getZNodeParent(conf),
- PropertiesUtil.combineProperties(props, conf, withoutTheseProps));
+ String zkQuorumOverride = PhoenixConfigurationUtil.getOutputClusterZkQuorum(conf);
+ if (zkQuorumOverride != null) {
+ return DriverManager.getConnection("jdbc:phoenix+zk:" + zkQuorumOverride,
+ PropertiesUtil.combineProperties(props, conf));
+ } else {
+ // FIXME find some better way to get tests working
+ String zkQuorumForTest = PhoenixConfigurationUtil.getZKQuorum(conf);
+ if (zkQuorumForTest != null && (zkQuorumForTest.contains(TEST_PARAM)
+ || zkQuorumForTest.equals(PhoenixRuntime.CONNECTIONLESS))) {
+ return DriverManager.getConnection("jdbc:phoenix:" + zkQuorumForTest,
+ PropertiesUtil.combineProperties(props, conf));
+ }
+ return DriverManager.getConnection("jdbc:phoenix",
+ PropertiesUtil.combineProperties(props, conf));
+ }
}
/**
* Returns the {@link Connection} from a ZooKeeper cluster string.
- *
* @param quorum a ZooKeeper quorum connection string
* @param clientPort a ZooKeeper client port
* @param znodeParent a zookeeper znode parent
* @return a Phoenix connection to the given connection string
*/
- private static Connection getConnection(final String quorum, final Integer clientPort, final String znodeParent, Properties props) throws SQLException {
+ @Deprecated
+ private static Connection getConnection(final String quorum, final Integer clientPort,
+ final String znodeParent, Properties props) throws SQLException {
Preconditions.checkNotNull(quorum);
- return DriverManager.getConnection(QueryUtil.getUrl(quorum, clientPort, znodeParent), props);
+ return DriverManager.getConnection(QueryUtil.getUrl(quorum, clientPort, znodeParent),
+ props);
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 13c5b70e1d..bb338b9128 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -605,12 +605,15 @@ public final class PhoenixConfigurationUtil {
Preconditions.checkNotNull(configuration);
return configuration.get(OUTPUT_TABLE_NAME);
}
-
+
/**
- * Returns the ZooKeeper quorum string for the HBase cluster a Phoenix MapReduce job will read from
+ * Returns the ZooKeeper quorum string for the HBase cluster a Phoenix MapReduce job will read
+ * from. If MAPREDUCE_OUTPUT_CLUSTER_QUORUM is not set, then it returns the value of
+ * HConstants.ZOOKEEPER_QUORUM
* @param configuration
* @return ZooKeeper quorum string
*/
+ @Deprecated
public static String getInputCluster(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
String quorum = configuration.get(MAPREDUCE_INPUT_CLUSTER_QUORUM);
@@ -621,10 +624,35 @@ public final class PhoenixConfigurationUtil {
}
/**
- * Returns the ZooKeeper quorum string for the HBase cluster a Phoenix MapReduce job will write to
+ * Returns the ZooKeeper quorum string for the HBase cluster a Phoenix MapReduce job will
+ * read from
+ * @param configuration
+ * @return ZooKeeper quorum string if defined, null otherwise
+ */
+ public static String getInputClusterZkQuorum(final Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ return configuration.get(MAPREDUCE_INPUT_CLUSTER_QUORUM);
+ }
+
+ /**
+ * Returns the value of HConstants.ZOOKEEPER_QUORUM.
+ * For tests only
+ * @param configuration
+ * @return ZooKeeper quorum string if defined, null otherwise
+ */
+ public static String getZKQuorum(final Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ return configuration.get(HConstants.ZOOKEEPER_QUORUM);
+ }
+
+ /**
+ * Returns the ZooKeeper quorum string for the HBase cluster a Phoenix MapReduce job will write
+ * to. If MAPREDUCE_OUTPUT_CLUSTER_QUORUM is not set, then it returns the value of
+ * HConstants.ZOOKEEPER_QUORUM
* @param configuration
* @return ZooKeeper quorum string
*/
+ @Deprecated
public static String getOutputCluster(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
String quorum = configuration.get(MAPREDUCE_OUTPUT_CLUSTER_QUORUM);
@@ -633,12 +661,23 @@ public final class PhoenixConfigurationUtil {
}
return quorum;
}
-
+
+ /**
+ * Returns the ZooKeeper quorum override MAPREDUCE_OUTPUT_CLUSTER_QUORUM for mapreduce jobs
+ * @param configuration
+ * @return ZooKeeper quorum string if defined, null otherwise
+ */
+ public static String getOutputClusterZkQuorum(final Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ return configuration.get(MAPREDUCE_OUTPUT_CLUSTER_QUORUM);
+ }
+
/**
* Returns the HBase Client Port
* @param configuration
* @return
*/
+ @Deprecated
public static Integer getClientPort(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
String clientPortString = configuration.get(HConstants.ZOOKEEPER_CLIENT_PORT);
@@ -695,6 +734,7 @@ public final class PhoenixConfigurationUtil {
* @param configuration
* @return
*/
+ @Deprecated
public static String getZNodeParent(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
return configuration.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index bf161a9499..dcc29f980e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -231,9 +231,9 @@ import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.index.PhoenixTransactionalIndexer;
import org.apache.phoenix.iterate.TableResultIterator;
import org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus;
+import org.apache.phoenix.jdbc.ConnectionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.log.ConnectionLimiter;
import org.apache.phoenix.log.DefaultConnectionLimiter;
import org.apache.phoenix.log.LoggingConnectionLimiter;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index bd66df3dac..7da182857d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -56,9 +56,9 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.jdbc.ConnectionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.log.QueryLoggerDisruptor;
import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.PSchema;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index a273403195..d300929ac7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -126,8 +126,11 @@ public interface QueryServices extends SQLCloseable {
public static final String HBASE_CLIENT_SCANNER_TIMEOUT_ATTRIB = "hbase.client.scanner.timeout.period";
public static final String RPC_TIMEOUT_ATTRIB = "hbase.rpc.timeout";
public static final String DYNAMIC_JARS_DIR_KEY = "hbase.dynamic.jars.dir";
+ @Deprecated // Use HConstants directly
public static final String ZOOKEEPER_QUORUM_ATTRIB = "hbase.zookeeper.quorum";
+ @Deprecated // Use HConstants directly
public static final String ZOOKEEPER_PORT_ATTRIB = "hbase.zookeeper.property.clientPort";
+ @Deprecated // Use HConstants directly
public static final String ZOOKEEPER_ROOT_NODE_ATTRIB = "zookeeper.znode.parent";
public static final String DISTINCT_VALUE_COMPRESS_THRESHOLD_ATTRIB = "phoenix.distinct.value.compress.threshold";
public static final String SEQUENCE_CACHE_SIZE_ATTRIB = "phoenix.sequence.cacheSize";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java
index 3c034e4b92..9fc001876d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java
@@ -29,7 +29,6 @@ import java.util.Set;
import java.util.TreeSet;
import org.apache.htrace.Span;
-import org.apache.htrace.Trace;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.metrics.MetricInfo;
import org.apache.phoenix.query.QueryServices;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java
index 0dc25b6f05..9440da030c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java
@@ -21,10 +21,8 @@ import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.htrace.Span;
import org.apache.htrace.SpanReceiver;
-import org.apache.htrace.impl.MilliSpan;
import org.apache.phoenix.metrics.MetricInfo;
import org.apache.phoenix.query.QueryServicesOptions;
import org.slf4j.Logger;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/NotAvailableTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/NotAvailableTransactionProvider.java
index 5d2ef59722..cde98bcde6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/NotAvailableTransactionProvider.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/NotAvailableTransactionProvider.java
@@ -22,8 +22,8 @@ import java.sql.SQLException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.phoenix.jdbc.ConnectionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.transaction.TransactionFactory.Provider;
public class NotAvailableTransactionProvider implements PhoenixTransactionProvider {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
index bbddc1329c..4a34ae4340 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
@@ -32,8 +32,8 @@ import org.apache.phoenix.coprocessor.OmidGCProcessor;
import org.apache.phoenix.coprocessor.OmidTransactionalProcessor;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.ConnectionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.transaction.TransactionFactory.Provider;
public class OmidTransactionProvider implements PhoenixTransactionProvider {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java
index d730c677a8..dd07c22528 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java
@@ -23,8 +23,8 @@ import java.sql.SQLException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.ConnectionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
public interface PhoenixTransactionProvider {
public enum Feature {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
index 848154f740..6e72fc1940 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
@@ -21,14 +21,14 @@ import static org.apache.phoenix.thirdparty.com.google.common.collect.Maps.newHa
import static org.apache.phoenix.util.PhoenixRuntime.ANNOTATION_ATTRIB_PREFIX;
import java.sql.SQLException;
-import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
-import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.hbase.client.Consistency;
+import org.apache.phoenix.jdbc.ConnectionInfo;
+import org.apache.phoenix.jdbc.ZKConnectionInfo;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PName;
@@ -207,24 +207,33 @@ public class JDBCUtil {
}
/**
- * Formats a zkUrl which includes the zkQuroum of the jdbc url and the rest to sort the zk quorum hosts.
- * Example input zkUrl "zk1.net,zk2.net,zk3.net:2181:/hbase"
- * Example input zkUrl "zk1.net,zk2.net,zk3.net:2181:/hbase:user_foo"
- * Returns: zk1.net,zk2.net,zk3.net:2181:/hbase
+ * Get the ZK quorom and root and node part of the URL, which is used by the HA code internally
+ * to identify the clusters.
+ * As we interpret a missing protocol as ZK, this is mostly idempotent for zk quorum strings.
+ *
+ * @param jdbcUrl JDBC URL
+ * @return part of the URL determining the ZK quorum and node
+ * @throws RuntimeException if the URL is invalid, or does not resolve to a ZK Registry
+ * connection
*/
- //TODO: Adjust for non-zkurl
- public static String formatZookeeperUrl(String zkUrl){
- String lowerZkUrl = zkUrl.toLowerCase();
- String[] components = lowerZkUrl.split(String.valueOf(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR));
- Preconditions.checkArgument(components.length > 0, "Unexpected zk url format.");
- String[] hosts = components[0].split(",");
- Preconditions.checkArgument(hosts.length > 0,"Unexpected zk url format no hosts found.");
- String hostsStrings = Arrays.stream(hosts).sorted().collect(Collectors.joining(","));
- components[0] = hostsStrings;
- // host:port:path:principal
- // additional arguments passed in url, strip them out
- int endIdx = Integer.min(components.length, 3);
- return Arrays.stream(components, 0, endIdx).collect(Collectors.joining(":"));
+ public static String formatZookeeperUrl(String jdbcUrl) {
+ ConnectionInfo connInfo;
+ try {
+ connInfo = ConnectionInfo.create(jdbcUrl, null, null);
+ // TODO in theory we could support non-ZK registries for HA.
+ // However, as HA already relies on ZK, this wouldn't be particularly useful,
+ // and would require significant changes.
+ if (!(connInfo instanceof ZKConnectionInfo)) {
+ throw new SQLException("HA connections must use ZooKeeper registry. " + jdbcUrl
+ + " is not a Zookeeper HBase connection");
+ }
+ ZKConnectionInfo zkInfo = (ZKConnectionInfo) connInfo;
+ StringBuilder sb = new StringBuilder();
+ sb.append(zkInfo.getZkHosts().replaceAll(":", "\\\\:")).append("::")
+ .append(zkInfo.getZkRootNode());
+ return sb.toString();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
}
-
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 6dd9b861d8..04be259c4c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -17,7 +17,10 @@
*/
package org.apache.phoenix.util;
+import static org.apache.phoenix.monitoring.MetricType.NUM_METADATA_LOOKUP_FAILURES;
import static org.apache.phoenix.schema.types.PDataType.ARRAY_TYPE_SUFFIX;
+import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkNotNull;
import java.io.File;
import java.io.FileInputStream;
@@ -45,22 +48,13 @@ import java.util.TreeSet;
import javax.annotation.Nullable;
-import org.apache.phoenix.jdbc.PhoenixMonitoredConnection;
-import org.apache.phoenix.jdbc.PhoenixMonitoredResultSet;
-import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
-import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
-import org.apache.phoenix.thirdparty.org.apache.commons.cli.DefaultParser;
-import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
-import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
-import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
-import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;
-
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
@@ -69,13 +63,16 @@ import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.expression.OrderByExpression;
import org.apache.phoenix.expression.RowKeyColumnExpression;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixMonitoredConnection;
+import org.apache.phoenix.jdbc.PhoenixMonitoredResultSet;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
-import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.monitoring.GlobalClientMetrics;
import org.apache.phoenix.monitoring.GlobalMetric;
+import org.apache.phoenix.monitoring.HistogramDistribution;
import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.monitoring.PhoenixTableMetric;
+import org.apache.phoenix.monitoring.TableMetricsManager;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.AmbiguousColumnException;
@@ -96,20 +93,19 @@ import org.apache.phoenix.schema.RowKeyValueAccessor;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.ValueBitSet;
import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.monitoring.HistogramDistribution;
-import org.apache.phoenix.monitoring.PhoenixTableMetric;
-import org.apache.phoenix.monitoring.TableMetricsManager;
-
import org.apache.phoenix.thirdparty.com.google.common.base.Function;
import org.apache.phoenix.thirdparty.com.google.common.base.Joiner;
import org.apache.phoenix.thirdparty.com.google.common.base.Splitter;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import static org.apache.phoenix.monitoring.MetricType.NUM_METADATA_LOOKUP_FAILURES;
-import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkArgument;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.DefaultParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;
/**
*
@@ -119,39 +115,77 @@ import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions
* @since 0.1
*/
public class PhoenixRuntime {
- public final static char JDBC_PROTOCOL_TERMINATOR = ';';
- public final static char JDBC_PROTOCOL_SEPARATOR = ':';
+ //TODO use strings, char needs a lot of error-prone conversions
+ public static final char JDBC_PROTOCOL_TERMINATOR = ';';
+ public static final char JDBC_PROTOCOL_SEPARATOR = ':';
/**
* JDBC URL jdbc protocol identifier
*/
- public final static String JDBC_PROTOCOL_IDENTIFIER = "jdbc";
+ public static final String JDBC_PROTOCOL_IDENTIFIER = "jdbc";
/**
- * JDBC URL phoenix protocol identifier
+ * JDBC URL phoenix protocol identifier (protocol determined from Configuration)
+ */
+ public static final String JDBC_PHOENIX_PROTOCOL_IDENTIFIER = "phoenix";
+
+ /**
+ * JDBC URL phoenix protocol identifier for ZK HBase connection
*/
- public final static String JDBC_PHOENIX_PROTOCOL_IDENTIFIER = "phoenix";
+ public static final String JDBC_PHOENIX_PROTOCOL_IDENTIFIER_ZK = "phoenix+zk";
+
+ /**
+ * JDBC URL phoenix protocol identifier for the deprecated Master based HBase connection
+ */
+ public static final String JDBC_PHOENIX_PROTOCOL_IDENTIFIER_MASTER = "phoenix+master";
+
+ /**
+ * JDBC URL phoenix protocol identifier for RPC based HBase connection
+ */
+ public static final String JDBC_PHOENIX_PROTOCOL_IDENTIFIER_RPC = "phoenix+rpc";
/**
* JDBC URL phoenix protocol identifier
*/
- public final static String JDBC_PHOENIX_THIN_IDENTIFIER = "thin";
+ public static final String JDBC_PHOENIX_THIN_IDENTIFIER = "thin";
+
+ /**
+ * Root for the generic JDBC URL that the Phoenix accepts.
+ */
+ public static final String JDBC_PROTOCOL =
+ JDBC_PROTOCOL_IDENTIFIER + JDBC_PROTOCOL_SEPARATOR + JDBC_PHOENIX_PROTOCOL_IDENTIFIER;
/**
- * Root for the JDBC URL that the Phoenix accepts accepts.
+ * Root for the explicit ZK JDBC URL that the Phoenix accepts.
*/
- public final static String JDBC_PROTOCOL = JDBC_PROTOCOL_IDENTIFIER + JDBC_PROTOCOL_SEPARATOR + JDBC_PHOENIX_PROTOCOL_IDENTIFIER;
+ public static final String JDBC_PROTOCOL_ZK =
+ JDBC_PROTOCOL_IDENTIFIER + JDBC_PROTOCOL_SEPARATOR
+ + JDBC_PHOENIX_PROTOCOL_IDENTIFIER_ZK;
/**
- * Root for the JDBC URL used by the thin driver. Duplicated here to avoid dependencies
- * between modules.
+ * Root for the explicit Master (HRPC) JDBC URL that the Phoenix accepts.
*/
- public final static String JDBC_THIN_PROTOCOL = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + JDBC_PHOENIX_THIN_IDENTIFIER;
+ public static final String JDBC_PROTOCOL_MASTER =
+ JDBC_PROTOCOL_IDENTIFIER + JDBC_PROTOCOL_SEPARATOR
+ + JDBC_PHOENIX_PROTOCOL_IDENTIFIER_MASTER;
+ /**
+ * Root for the explicit Master (HRPC) JDBC URL that the Phoenix accepts.
+ */
+ public static final String JDBC_PROTOCOL_RPC =
+ JDBC_PROTOCOL_IDENTIFIER + JDBC_PROTOCOL_SEPARATOR
+ + JDBC_PHOENIX_PROTOCOL_IDENTIFIER_RPC;
+ /**
+ * Root for the JDBC URL used by the thin driver. Duplicated here to avoid dependencies between
+ * modules.
+ */
+ public static final String JDBC_THIN_PROTOCOL =
+ JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + JDBC_PHOENIX_THIN_IDENTIFIER;
@Deprecated
- public final static String EMBEDDED_JDBC_PROTOCOL = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+ public static final String EMBEDDED_JDBC_PROTOCOL =
+ PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
/**
* Use this connection property to control HBase timestamps
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index 00a8d8dc52..fb2e3434bf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -65,7 +65,9 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import javax.annotation.Nullable;
@@ -74,7 +76,6 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.expression.function.ExternalSqlTypeIdFunction;
import org.apache.phoenix.expression.function.IndexStateNameFunction;
import org.apache.phoenix.expression.function.SQLIndexTypeFunction;
@@ -84,8 +85,8 @@ import org.apache.phoenix.expression.function.SqlTypeNameFunction;
import org.apache.phoenix.expression.function.TransactionProviderNameFunction;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.ConnectionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.parse.TableName;
@@ -98,18 +99,17 @@ import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.tool.SchemaExtractionProcessor;
import org.apache.phoenix.schema.tool.SchemaProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.thirdparty.com.google.common.base.Function;
import org.apache.phoenix.thirdparty.com.google.common.base.Joiner;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.apache.phoenix.thirdparty.com.google.common.collect.Iterables;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public final class QueryUtil {
@@ -299,6 +299,7 @@ public final class QueryUtil {
/**
* Create the Phoenix JDBC connection URL from the provided cluster connection details.
*/
+ @Deprecated
public static String getUrl(String zkQuorum) {
return getUrlInternal(zkQuorum, null, null, null);
}
@@ -306,6 +307,7 @@ public final class QueryUtil {
/**
* Create the Phoenix JDBC connection URL from the provided cluster connection details.
*/
+ @Deprecated
public static String getUrl(String zkQuorum, int clientPort) {
return getUrlInternal(zkQuorum, clientPort, null, null);
}
@@ -313,6 +315,7 @@ public final class QueryUtil {
/**
* Create the Phoenix JDBC connection URL from the provided cluster connection details.
*/
+ @Deprecated
public static String getUrl(String zkQuorum, String znodeParent) {
return getUrlInternal(zkQuorum, null, znodeParent, null);
}
@@ -320,6 +323,7 @@ public final class QueryUtil {
/**
* Create the Phoenix JDBC connection URL from the provided cluster connection details.
*/
+ @Deprecated
public static String getUrl(String zkQuorum, int port, String znodeParent, String principal) {
return getUrlInternal(zkQuorum, port, znodeParent, principal);
}
@@ -327,6 +331,7 @@ public final class QueryUtil {
/**
* Create the Phoenix JDBC connection URL from the provided cluster connection details.
*/
+ @Deprecated
public static String getUrl(String zkQuorum, int port, String znodeParent) {
return getUrlInternal(zkQuorum, port, znodeParent, null);
}
@@ -334,6 +339,7 @@ public final class QueryUtil {
/**
* Create the Phoenix JDBC connection URL from the provided cluster connection details.
*/
+ @Deprecated
public static String getUrl(String zkQuorum, Integer port, String znodeParent) {
return getUrlInternal(zkQuorum, port, znodeParent, null);
}
@@ -341,13 +347,15 @@ public final class QueryUtil {
/**
* Create the Phoenix JDBC connection URL from the provided cluster connection details.
*/
+ @Deprecated
public static String getUrl(String zkQuorum, Integer port, String znodeParent, String principal) {
return getUrlInternal(zkQuorum, port, znodeParent, principal);
}
+ @Deprecated
private static String getUrlInternal(String zkQuorum, Integer port, String znodeParent, String principal) {
- return new PhoenixEmbeddedDriver.ConnectionInfo(zkQuorum, port, znodeParent, principal, null).toUrl()
- + PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+ return String.join(":", PhoenixRuntime.JDBC_PROTOCOL, zkQuorum, port == null ? "" : port.toString(), znodeParent == null ? "" : znodeParent, principal == null ? "" : principal)
+ + Character.toString(PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR);
}
public static String getExplainPlan(ResultSet rs) throws SQLException {
@@ -435,12 +443,18 @@ public final class QueryUtil {
*/
public static String getConnectionUrl(Properties props, Configuration conf, String principal)
throws SQLException {
- // read the hbase properties from the configuration
- int port = getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT, props, conf);
- // Build the ZK quorum server string with "server:clientport" list, separated by ','
- final String server = getString(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST, props, conf);
- String znodeParent = getString(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT, props, conf);
- String url = getUrl(server, port, znodeParent, principal);
+ ReadOnlyProps propsWithPrincipal;
+ if (principal != null) {
+ Map<String, String> principalProp = new HashMap<>();
+ principalProp.put(QueryServices.HBASE_CLIENT_PRINCIPAL, principal);
+ propsWithPrincipal = new ReadOnlyProps(principalProp.entrySet().iterator());
+ } else {
+ propsWithPrincipal = ReadOnlyProps.EMPTY_PROPS;
+ }
+ ConnectionInfo info =
+ ConnectionInfo.createNoLogin(PhoenixRuntime.JDBC_PROTOCOL, conf, propsWithPrincipal,
+ props);
+ String url = info.toUrl();
if (url.endsWith(PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR + "")) {
url = url.substring(0, url.length() - 1);
}
@@ -462,7 +476,7 @@ public final class QueryUtil {
}
return url;
}
-
+
private static int getInt(String key, int defaultValue, Properties props, Configuration conf) {
if (conf == null) {
Preconditions.checkNotNull(props);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java
index 9f4229777d..3c2001205d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ClusterRoleRecordTest.java
@@ -42,8 +42,8 @@ import org.slf4j.LoggerFactory;
*/
public class ClusterRoleRecordTest {
private static final Logger LOG = LoggerFactory.getLogger(ClusterRoleRecordTest.class);
- private static final String ZK1 = "zk1-1,zk1-2:2181:/hbase";
- private static final String ZK2 = "zk2-1,zk2-2:2181:/hbase";
+ private static final String ZK1 = "zk1-1\\:2181,zk1-2\\:2181::/hbase";
+ private static final String ZK2 = "zk2-1\\:2181,zk2-2\\:2181::/hbase";
@Rule
public final TestName testName = new TestName();
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFailureTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFailureTest.java
index ed74645d70..c64252d530 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFailureTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFailureTest.java
@@ -46,6 +46,8 @@ public class ParallelPhoenixConnectionFailureTest extends BaseTest {
private static String url =
JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + PhoenixRuntime.CONNECTIONLESS;
+ private static int WAIT_MS = 30000;
+
@Test
public void testExecuteQueryChainFailure() throws SQLException {
HBaseTestingUtility hbaseTestingUtility = new HBaseTestingUtility();
@@ -77,16 +79,16 @@ public class ParallelPhoenixConnectionFailureTest extends BaseTest {
parallelConn.createStatement().execute("SELECT * FROM SYSTEM.CATALOG");
parallelConn.createStatement().execute("SELECT * FROM SYSTEM.CATALOG");
// Verify successful execution on both connections
- hbaseTestingUtility.waitFor(10000, () -> (numStatementsCreatedOnConn1.get() == 2)
+ hbaseTestingUtility.waitFor(WAIT_MS, () -> (numStatementsCreatedOnConn1.get() == 2)
&& (numStatementsCreatedOnConn2.get() == 2));
// Error on conn1, we shouldn't use conn1 after that
doThrow(new SQLException()).when(connSpy1).createStatement();
parallelConn.createStatement().execute("SELECT * FROM SYSTEM.CATALOG");
- hbaseTestingUtility.waitFor(10000, () -> numStatementsCreatedOnConn2.get() == 3);
+ hbaseTestingUtility.waitFor(WAIT_MS, () -> numStatementsCreatedOnConn2.get() == 3);
doAnswer(answer1).when(connSpy1).createStatement();
// Should still have a successful execution only from conn2 since conn1 errored before
parallelConn.createStatement().execute("SELECT * FROM SYSTEM.CATALOG");
- hbaseTestingUtility.waitFor(10000, () -> (numStatementsCreatedOnConn1.get() == 2)
+ hbaseTestingUtility.waitFor(WAIT_MS, () -> (numStatementsCreatedOnConn1.get() == 2)
&& (numStatementsCreatedOnConn2.get() == 4));
// Any task that we chain on conn1 should error out
assertTrue(context.chainOnConn1(() -> Boolean.TRUE).isCompletedExceptionally());
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java
index 1cc3f3f346..fe439d7462 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java
@@ -20,7 +20,10 @@ package org.apache.phoenix.jdbc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
import java.sql.Driver;
import java.sql.DriverManager;
@@ -28,121 +31,324 @@ import java.sql.SQLException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.query.HBaseFactoryProvider;
import org.junit.Test;
public class PhoenixEmbeddedDriverTest {
+
@Test
- public void testGetConnectionInfo() throws SQLException {
+ public void testGetZKConnectionInfo() throws SQLException {
+ Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ String defaultQuorum = config.get(HConstants.ZOOKEEPER_QUORUM);
+
+ for (String protocol : new String[] { "phoenix", "phoenix+zk" }) {
+ String[] urls =
+ new String[] { null,
+ "",
+ "jdbc:" + protocol + "",
+ "jdbc:" + protocol + ";test=true",
+ "jdbc:" + protocol + ":localhost",
+ "localhost",
+ "localhost;",
+ "jdbc:" + protocol + ":localhost:123",
+ "jdbc:" + protocol + ":localhost:123;foo=bar",
+ "localhost:123",
+ "jdbc:" + protocol + ":localhost:123:/hbase",
+ "jdbc:" + protocol + ":localhost:123:/foo-bar",
+ "jdbc:" + protocol + ":localhost:123:/foo-bar;foo=bas",
+ "localhost:123:/foo-bar",
+ "jdbc:" + protocol + ":localhost:/hbase",
+ "jdbc:" + protocol + ":localhost:/foo-bar",
+ "jdbc:" + protocol + ":localhost:/foo-bar;test=true",
+ "localhost:/foo-bar",
+ "jdbc:" + protocol + ":v1,v2,v3",
+ "jdbc:" + protocol + ":v1,v2,v3;",
+ "jdbc:" + protocol + ":v1,v2,v3;test=true",
+ "v1,v2,v3",
+ "jdbc:" + protocol + ":v1,v2,v3:/hbase",
+ "jdbc:" + protocol + ":v1,v2,v3:/hbase;test=true",
+ "v1,v2,v3:/foo-bar",
+ "jdbc:" + protocol + ":v1,v2,v3:123:/hbase",
+ "v1,v2,v3:123:/hbase",
+ "jdbc:" + protocol + ":v1,v2,v3:123:/hbase;test=false",
+ "jdbc:" + protocol + ":v1,v2,v3:123:/hbase:user/principal:/user.keytab;test=false",
+ "jdbc:" + protocol + ":v1,v2,v3:123:/foo-bar:user/principal:/user.keytab;test=false",
+ "jdbc:" + protocol + ":v1,v2,v3:123:user/principal:/user.keytab;test=false",
+ "jdbc:" + protocol + ":v1,v2,v3:user/principal:/user.keytab;test=false",
+ "jdbc:" + protocol + ":v1,v2,v3:/hbase:user/principal:/user.keytab;test=false",
+ "jdbc:" + protocol + ":v1,v2,v3:LongRunningQueries;test=false",
+ "jdbc:" + protocol + ":v1,v2,v3:345:LongRunningQueries;test=false",
+ "jdbc:" + protocol + ":localhost:1234:user:C:\\user.keytab",
+ "jdbc:" + protocol + ":v1,v2,v3:345:/hbase:user1:C:\\Documents and Settings\\user1\\user1.keytab;test=false", };
+ String[][] partsList =
+ new String[][] { { defaultQuorum + ":2181", null, "/hbase" },
+ { defaultQuorum + ":2181", null, "/hbase" },
+ { defaultQuorum + ":2181", null, "/hbase" }, {},
+ { "localhost:2181", null, "/hbase" },
+ { "localhost:2181", null, "/hbase" },
+ { "localhost:2181", null, "/hbase" },
+ { "localhost:123", null, "/hbase" },
+ { "localhost:123", null, "/hbase" },
+ { "localhost:123", null, "/hbase" },
+ { "localhost:123", null, "/hbase" },
+ { "localhost:123", null, "/foo-bar" },
+ { "localhost:123", null, "/foo-bar" },
+ { "localhost:123", null, "/foo-bar" },
+ { "localhost:2181", null, "/hbase" },
+ { "localhost:2181", null, "/foo-bar" },
+ { "localhost:2181", null, "/foo-bar" },
+ { "localhost:2181", null, "/foo-bar" },
+ { "v1:2181,v2:2181,v3:2181", null, "/hbase" },
+ { "v1:2181,v2:2181,v3:2181", null, "/hbase" },
+ { "v1:2181,v2:2181,v3:2181", null, "/hbase" },
+ { "v1:2181,v2:2181,v3:2181", null, "/hbase" },
+ { "v1:2181,v2:2181,v3:2181", null, "/hbase" },
+ { "v1:2181,v2:2181,v3:2181", null, "/hbase" },
+ { "v1:2181,v2:2181,v3:2181", null, "/foo-bar" },
+ { "v1:123,v2:123,v3:123", null, "/hbase" },
+ { "v1:123,v2:123,v3:123", null, "/hbase" },
+ { "v1:123,v2:123,v3:123", null, "/hbase" },
+ { "v1:123,v2:123,v3:123", null, "/hbase", "user/principal",
+ "/user.keytab" },
+ { "v1:123,v2:123,v3:123", null, "/foo-bar", "user/principal",
+ "/user.keytab" },
+ { "v1:123,v2:123,v3:123", null, "/hbase", "user/principal",
+ "/user.keytab" },
+ { "v1:2181,v2:2181,v3:2181", null, "/hbase", "user/principal",
+ "/user.keytab" },
+ { "v1:2181,v2:2181,v3:2181", null, "/hbase", "user/principal",
+ "/user.keytab" },
+ { "v1:2181,v2:2181,v3:2181", null, "/hbase", "LongRunningQueries" },
+ { "v1:345,v2:345,v3:345", null, "/hbase", "LongRunningQueries" },
+ { "localhost:1234", null, "/hbase", "user", "C:\\user.keytab" },
+ { "v1:345,v2:345,v3:345", null, "/hbase", "user1",
+ "C:\\Documents and Settings\\user1\\user1.keytab" }, };
+ assertEquals(urls.length, partsList.length);
+ for (int i = 0; i < urls.length; i++) {
+ int pos = 0;
+ try {
+ ZKConnectionInfo info =
+ (ZKConnectionInfo) ConnectionInfo.create(urls[i], null, null);
+ String[] parts = partsList[i];
+ if (parts.length > pos) {
+ assertEquals(parts[pos], info.getZkHosts());
+ }
+ if (parts.length > ++pos) {
+ assertEquals(parts[pos], info.getZkPort());
+ }
+ if (parts.length > ++pos) {
+ assertEquals(parts[pos], info.getZkRootNode());
+ }
+ if (parts.length > ++pos) {
+ assertEquals(parts[pos], info.getPrincipal());
+ }
+ if (parts.length > ++pos) {
+ assertEquals(parts[pos], info.getKeytab());
+ }
+ } catch (AssertionError e) {
+ throw new AssertionError(
+ "For \"" + urls[i] + " at position: " + pos + "\": " + e.getMessage());
+ }
+ }
+ }
+
+ }
+
+ @Test
+ public void testGetMasterConnectionInfo() throws SQLException {
+ assumeTrue(VersionInfo.compareVersion(VersionInfo.getVersion(), "2.3.0")>=0);
Configuration config =
HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
- String defaultQuorum = config.get(HConstants.ZOOKEEPER_QUORUM);
+ config.set("hbase.client.registry.impl", "org.apache.hadoop.hbase.client.MasterRegistry");
+ String defaultMasters = "defaultmaster1:1243,defaultmaster2:2345";
+ config.set("hbase.masters", defaultMasters);
String[] urls = new String[] {
null,
"",
+ "jdbc:phoenix+master",
+ "jdbc:phoenix+master;test=true",
"jdbc:phoenix",
- "jdbc:phoenix;test=true",
- "jdbc:phoenix:localhost",
+ "jdbc:phoenix+master:localhost",
"localhost",
"localhost;",
- "jdbc:phoenix:localhost:123",
- "jdbc:phoenix:localhost:123;foo=bar",
"localhost:123",
- "jdbc:phoenix:localhost:123:/hbase",
- "jdbc:phoenix:localhost:123:/foo-bar",
- "jdbc:phoenix:localhost:123:/foo-bar;foo=bas",
- "localhost:123:/foo-bar",
- "jdbc:phoenix:localhost:/hbase",
- "jdbc:phoenix:localhost:/foo-bar",
- "jdbc:phoenix:localhost:/foo-bar;test=true",
- "localhost:/foo-bar",
- "jdbc:phoenix:v1,v2,v3",
- "jdbc:phoenix:v1,v2,v3;",
- "jdbc:phoenix:v1,v2,v3;test=true",
- "v1,v2,v3",
- "jdbc:phoenix:v1,v2,v3:/hbase",
- "jdbc:phoenix:v1,v2,v3:/hbase;test=true",
- "v1,v2,v3:/foo-bar",
- "jdbc:phoenix:v1,v2,v3:123:/hbase",
- "v1,v2,v3:123:/hbase",
- "jdbc:phoenix:v1,v2,v3:123:/hbase;test=false",
- "jdbc:phoenix:v1,v2,v3:123:/hbase:user/principal:/user.keytab;test=false",
- "jdbc:phoenix:v1,v2,v3:123:/foo-bar:user/principal:/user.keytab;test=false",
- "jdbc:phoenix:v1,v2,v3:123:user/principal:/user.keytab;test=false",
- "jdbc:phoenix:v1,v2,v3:user/principal:/user.keytab;test=false",
- "jdbc:phoenix:v1,v2,v3:/hbase:user/principal:/user.keytab;test=false",
- "jdbc:phoenix:v1,v2,v3:LongRunningQueries;test=false",
- "jdbc:phoenix:v1,v2,v3:345:LongRunningQueries;test=false",
- "jdbc:phoenix:localhost:1234:user:C:\\user.keytab",
- "jdbc:phoenix:v1,v2,v3:345:/hbase:user1:C:\\Documents and Settings\\user1\\user1.keytab;test=false",
+ "localhost,localhost2:123;",
+ "localhost\\:123",
+ "localhost\\:123:",
+ "localhost\\:123::",
+ "localhost\\:123:::",
+ "localhost\\:123::::",
+ "localhost\\:123:::::",
+ "localhost\\:123:345::::",
+ "localhost,localhost2\\:123;",
+ "localhost,localhost2\\:123:456",
+ "localhost,localhost2\\:123:456;test=false",
+ "localhost\\:123:::user/principal:/user.keytab",
+ "localhost\\:123:::LongRunningQueries",
+ "localhost\\:123:::LongRunningQueries:",
+ "localhost\\:123:::LongRunningQueries::",
+ "localhost\\:123:::user/principal:C:\\user.keytab",
+ "localhost\\:123:::user/principal:C:\\Documents and Settings\\user1\\user1.keytab",
};
- ConnectionInfo[] infos = new ConnectionInfo[] {
- new ConnectionInfo(defaultQuorum, 2181, "/hbase"),
- new ConnectionInfo(defaultQuorum, 2181, "/hbase"),
- new ConnectionInfo(defaultQuorum, 2181, "/hbase"),
- new ConnectionInfo(null,null,null),
- new ConnectionInfo("localhost",null,null),
- new ConnectionInfo("localhost",null,null),
- new ConnectionInfo("localhost",null,null),
- new ConnectionInfo("localhost",123,null),
- new ConnectionInfo("localhost",123,null),
- new ConnectionInfo("localhost",123,null),
- new ConnectionInfo("localhost",123,"/hbase"),
- new ConnectionInfo("localhost",123,"/foo-bar"),
- new ConnectionInfo("localhost",123,"/foo-bar"),
- new ConnectionInfo("localhost",123,"/foo-bar"),
- new ConnectionInfo("localhost",null,"/hbase"),
- new ConnectionInfo("localhost",null,"/foo-bar"),
- new ConnectionInfo("localhost",null,"/foo-bar"),
- new ConnectionInfo("localhost",null,"/foo-bar"),
- new ConnectionInfo("v1,v2,v3",null,null),
- new ConnectionInfo("v1,v2,v3",null,null),
- new ConnectionInfo("v1,v2,v3",null,null),
- new ConnectionInfo("v1,v2,v3",null,null),
- new ConnectionInfo("v1,v2,v3",null,"/hbase"),
- new ConnectionInfo("v1,v2,v3",null,"/hbase"),
- new ConnectionInfo("v1,v2,v3",null,"/foo-bar"),
- new ConnectionInfo("v1,v2,v3",123,"/hbase"),
- new ConnectionInfo("v1,v2,v3",123,"/hbase"),
- new ConnectionInfo("v1,v2,v3",123,"/hbase"),
- new ConnectionInfo("v1,v2,v3",123,"/hbase","user/principal", "/user.keytab" ),
- new ConnectionInfo("v1,v2,v3",123,"/foo-bar","user/principal", "/user.keytab" ),
- new ConnectionInfo("v1,v2,v3",123, null,"user/principal", "/user.keytab" ),
- new ConnectionInfo("v1,v2,v3", null, null,"user/principal", "/user.keytab" ),
- new ConnectionInfo("v1,v2,v3",null,"/hbase","user/principal", "/user.keytab" ),
- new ConnectionInfo("v1,v2,v3",null,null,"LongRunningQueries", null ),
- new ConnectionInfo("v1,v2,v3",345,null,"LongRunningQueries", null ),
- new ConnectionInfo("localhost", 1234, null, "user", "C:\\user.keytab"),
- new ConnectionInfo("v1,v2,v3", 345, "/hbase", "user1", "C:\\Documents and Settings\\user1\\user1.keytab"),
+ String[][] partsList = new String[][] {
+ {defaultMasters},
+ {defaultMasters},
+ {defaultMasters},
+ {defaultMasters},
+ {defaultMasters},
+ {"localhost:"+HConstants.DEFAULT_MASTER_PORT},
+ {"localhost:"+HConstants.DEFAULT_MASTER_PORT},
+ {"localhost:"+HConstants.DEFAULT_MASTER_PORT},
+ {"localhost:123"},
+ {"localhost2:123,localhost:123"},
+ {"localhost:123"},
+ {"localhost:123"},
+ {"localhost:123"},
+ {"localhost:123"},
+ {"localhost:123"},
+ {"localhost:123"},
+ {"localhost:123"},
+ {"localhost2:123,localhost:16000"},
+ {"localhost2:123,localhost:456"},
+ {"localhost2:123,localhost:456"},
+ {"localhost:123","user/principal","/user.keytab"},
+ {"localhost:123","LongRunningQueries",null},
+ {"localhost:123","LongRunningQueries",null},
+ {"localhost:123","LongRunningQueries",null},
+ {"localhost:123","user/principal","C:\\user.keytab"},
+ {"localhost:123","user/principal","C:\\Documents and Settings\\user1\\user1.keytab"},
};
- assertEquals(urls.length,infos.length);
+ assertEquals(urls.length,partsList.length);
for (int i = 0; i < urls.length; i++) {
try {
- ConnectionInfo info = ConnectionInfo.create(urls[i]);
- assertEquals(infos[i], info);
+ Configuration testConfig = new Configuration(config);
+ MasterConnectionInfo info = (MasterConnectionInfo)ConnectionInfo.create(urls[i], testConfig, null, null);
+ String[] parts = partsList[i];
+ assertEquals(parts[0], info.getBoostrapServers());
+ if(parts.length>1) {
+ assertEquals(parts[1], info.getPrincipal());
+ } else {
+ assertNull(info.getPrincipal());
+ }
+ if(parts.length>2) {
+ assertEquals(parts[2], info.getKeytab());
+ } else {
+ assertNull(info.getKeytab());
+ }
} catch (AssertionError e) {
- throw new AssertionError("For \"" + urls[i] + "\": " + e.getMessage());
+ throw new AssertionError("For \"" + urls[i] + ": " + e.getMessage());
}
}
}
+
+ @Test
+ public void testGetRPCConnectionInfo() throws SQLException {
+ assumeTrue(VersionInfo.compareVersion(VersionInfo.getVersion(), "2.5.0")>=0);
+ Configuration config =
+ HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ config.set("hbase.client.registry.impl", "org.apache.hadoop.hbase.client.RpcConnectionRegistry");
+ String defaultBoostraps = "defaultmaster1:1243,defaultmaster2:2345";
+ config.set("hbase.client.bootstrap.servers", defaultBoostraps);
+
+ String[] urls = new String[] {
+ null,
+ "",
+ "jdbc:phoenix+rpc",
+ "jdbc:phoenix+rpc\";test=true",
+ "jdbc:phoenix",
+ "jdbc:phoenix+rpc\":localhost",
+ "localhost",
+ "localhost;",
+ "localhost:123",
+ "localhost,localhost2:123;",
+ "localhost\\:123",
+ "localhost\\:123:",
+ "localhost\\:123::",
+ "localhost\\:123:::",
+ "localhost\\:123::::",
+ "localhost\\:123:::::",
+ "localhost\\:123:345::::",
+ "localhost,localhost2\\:123;",
+ "localhost,localhost2\\:123:456",
+ "localhost,localhost2\\:123:456;test=false",
+ "localhost\\:123:::user/principal:/user.keytab",
+ "localhost\\:123:::LongRunningQueries",
+ "localhost\\:123:::LongRunningQueries:",
+ "localhost\\:123:::LongRunningQueries::",
+ "localhost\\:123:::user/principal:C:\\user.keytab",
+ "localhost\\:123:::user/principal:C:\\Documents and Settings\\user1\\user1.keytab",
+ };
+ String[][] partsList = new String[][] {
+ {defaultBoostraps},
+ {defaultBoostraps},
+ {defaultBoostraps},
+ {defaultBoostraps},
+ {defaultBoostraps},
+ {"localhost"},
+ {"localhost"},
+ {"localhost"},
+ {"localhost:123"},
+ {"localhost2:123,localhost:123"},
+ {"localhost:123"},
+ {"localhost:123"},
+ {"localhost:123"},
+ {"localhost:123"},
+ {"localhost:123"},
+ {"localhost:123"},
+ {"localhost:123"},
+ //No default port
+ {"localhost,localhost2:123"},
+ {"localhost2:123,localhost:456"},
+ {"localhost2:123,localhost:456"},
+ {"localhost:123","user/principal","/user.keytab"},
+ {"localhost:123","LongRunningQueries",null},
+ {"localhost:123","LongRunningQueries",null},
+ {"localhost:123","LongRunningQueries",null},
+ {"localhost:123","user/principal","C:\\user.keytab"},
+ {"localhost:123","user/principal","C:\\Documents and Settings\\user1\\user1.keytab"},
+ };
+ assertEquals(urls.length,partsList.length);
+ for (int i = 0; i < urls.length; i++) {
+ try {
+ Configuration testConfig = new Configuration(config);
+ RPCConnectionInfo info = (RPCConnectionInfo)ConnectionInfo.create(urls[i], testConfig, null, null);
+ String[] parts = partsList[i];
+ assertEquals(parts[0], info.getBoostrapServers());
+ if(parts.length>1) {
+ assertEquals(parts[1], info.getPrincipal());
+ } else {
+ assertNull(info.getPrincipal());
+ }
+ if(parts.length>2) {
+ assertEquals(parts[2], info.getKeytab());
+ } else {
+ assertNull(info.getKeytab());
+ }
+ } catch (AssertionError e) {
+ throw new AssertionError("For \"" + urls[i] + ": " + e.getMessage());
+ }
+ }
+ }
+
@Test
public void testNegativeGetConnectionInfo() throws SQLException {
String[] urls = new String[] {
- "jdbc:phoenix::",
- "jdbc:phoenix:;",
+ //Reject unescaped ports in quorum string
"jdbc:phoenix:v1:1,v2:2,v3:3",
"jdbc:phoenix:v1:1,v2:2,v3:3;test=true",
"jdbc:phoenix:v1,v2,v3:-1:/hbase;test=true",
"jdbc:phoenix:v1,v2,v3:-1",
- "jdbc:phoenix:v1,v2,v3:123::/hbase",
- "jdbc:phoenix:v1,v2,v3:123::/hbase;test=false",
+ "jdbc:phoenix+zk:v1:1,v2:2,v3:3",
+ "jdbc:phoenix+zk:v1:1,v2:2,v3:3;test=true",
+ "jdbc:phoenix+zk:v1,v2,v3:-1:/hbase;test=true",
+ "jdbc:phoenix+zk:v1,v2,v3:-1"
};
for (String url : urls) {
try {
- ConnectionInfo.create(url);
+ ConnectionInfo.create(url, null, null);
throw new AssertionError("Expected exception for \"" + url + "\"");
} catch (SQLException e) {
try {
@@ -153,7 +359,171 @@ public class PhoenixEmbeddedDriverTest {
}
}
}
-
+
+ @Test
+ public void testRPCNegativeGetConnectionInfo() throws SQLException {
+ assumeTrue(VersionInfo.compareVersion(VersionInfo.getVersion(), "2.5.0")>=0);
+ String[] urls = new String[] {
+ //Reject unescaped and invalid ports in quorum string
+ "jdbc:phoenix+rpc:v1:1,v2:2,v3:3",
+ "jdbc:phoenix+rpc:v1:1,v2:2,v3:3;test=true",
+ "jdbc:phoenix+rpc:v1,v2,v3:-1:/hbase;test=true",
+ "jdbc:phoenix+rpc:v1,v2,v3:-1",
+ "jdbc:phoenix+master:v1:1,v2:2,v3:3",
+ "jdbc:phoenix+master:v1:1,v2:2,v3:3;test=true",
+ "jdbc:phoenix+master:v1,v2,v3:-1:/hbase;test=true",
+ "jdbc:phoenix+master:v1,v2,v3:-1",
+ //Reject rootnode and missing empty rootnode field
+ "jdbc:phoenix+rpc:localhost,localhost2\\:123:456:rootNode",
+ "jdbc:phoenix+rpc:localhost,localhost2\\:123:456:rootNode:prinicpial:keystore",
+ "jdbc:phoenix+rpc:localhost,localhost2\\:123:456:prinicpial",
+ "jdbc:phoenix+rpc:localhost,localhost2\\:123:456:prinicpial:keystore",
+ "jdbc:phoenix+master:localhost,localhost2\\:123:456:rootNode",
+ "jdbc:phoenix+master:localhost,localhost2\\:123:456:rootNode:prinicpial:keystore",
+ "jdbc:phoenix+master:localhost,localhost2\\:123:456:prinicpial",
+ "jdbc:phoenix+master:localhost,localhost2\\:123:456:prinicpial:keystore",
+
+ };
+ for (String url : urls) {
+ try {
+ ConnectionInfo.create(url, null, null);
+ throw new AssertionError("Expected exception for \"" + url + "\"");
+ } catch (SQLException e) {
+ try {
+ assertEquals(SQLExceptionCode.MALFORMED_CONNECTION_URL.getSQLState(), e.getSQLState());
+ } catch (AssertionError ae) {
+ throw new AssertionError("For \"" + url + "\": " + ae.getMessage());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testMasterDefaults() throws SQLException {
+ assumeTrue(VersionInfo.compareVersion(VersionInfo.getVersion(), "2.3.0") >= 0);
+ try {
+ Configuration config =
+ HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ config.set("hbase.client.registry.impl",
+ "org.apache.hadoop.hbase.client.MasterRegistry");
+ ConnectionInfo.create("jdbc:phoenix+master", config, null, null);
+ fail("Should have thrown exception");
+ } catch (SQLException e) {
+ }
+
+ Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ config.set("hbase.client.registry.impl", "org.apache.hadoop.hbase.client.MasterRegistry");
+ config.set("hbase.master.hostname", "master.hostname");
+ MasterConnectionInfo info =
+ (MasterConnectionInfo) ConnectionInfo.create("jdbc:phoenix+master", config, null,
+ null);
+ assertEquals(info.getBoostrapServers(), "master.hostname:16000");
+
+ config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ config.set("hbase.client.registry.impl", "org.apache.hadoop.hbase.client.MasterRegistry");
+ config.set("hbase.master.hostname", "master.hostname");
+ config.set("hbase.master.port", "17000");
+ info = (MasterConnectionInfo) ConnectionInfo.create("jdbc:phoenix", config, null, null);
+ assertEquals(info.getBoostrapServers(), "master.hostname:17000");
+
+ config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ config.set("hbase.client.registry.impl", "org.apache.hadoop.hbase.client.MasterRegistry");
+ config.set("hbase.master.hostname", "master.hostname");
+ config.set("hbase.master.port", "17000");
+ config.set("hbase.masters", "master1:123,master2:234,master3:345");
+ info = (MasterConnectionInfo) ConnectionInfo.create("jdbc:phoenix", config, null, null);
+ assertEquals(info.getBoostrapServers(), "master1:123,master2:234,master3:345");
+
+ config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ config.set("hbase.client.registry.impl", "org.apache.hadoop.hbase.client.MasterRegistry");
+ config.set("hbase.master.port", "17000");
+ info =
+ (MasterConnectionInfo) ConnectionInfo.create(
+ "jdbc:phoenix+master:master1.from.url,master2.from.url", config, null, null);
+ assertEquals(info.getBoostrapServers(), "master1.from.url:17000,master2.from.url:17000");
+
+ config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ config.set("hbase.client.registry.impl", "org.apache.hadoop.hbase.client.MasterRegistry");
+ config.set("hbase.master.port", "17000");
+ info =
+ (MasterConnectionInfo) ConnectionInfo.create(
+ "jdbc:phoenix+master:master1.from.url\\:123,master2.from.url", config, null,
+ null);
+ assertEquals(info.getBoostrapServers(), "master1.from.url:123,master2.from.url:17000");
+
+ config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ config.set("hbase.client.registry.impl", "org.apache.hadoop.hbase.client.MasterRegistry");
+ config.set("hbase.master.hostname", "master.hostname");
+ config.set("hbase.master.port", "17000");
+ config.set("hbase.masters", "master1:123,master2:234,master3:345");
+ info =
+ (MasterConnectionInfo) ConnectionInfo.create(
+ "jdbc:phoenix:master1.from.url\\:123,master2.from.url:18000", config, null,
+ null);
+ assertEquals(info.getBoostrapServers(), "master1.from.url:123,master2.from.url:18000");
+
+ config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ config.set("hbase.client.registry.impl", "org.apache.hadoop.hbase.client.MasterRegistry");
+ config.set("hbase.master.hostname", "master.hostname");
+ config.set("hbase.master.port", "17000");
+ config.set("hbase.masters", "master1:123,master2:234,master3:345");
+ info =
+ (MasterConnectionInfo) ConnectionInfo.create(
+ "jdbc:phoenix:master1.from.url\\:123,master2.from.url\\:234:18000", config,
+ null, null);
+ assertEquals(info.getBoostrapServers(), "master1.from.url:123,master2.from.url:234");
+ }
+
+ @Test
+ public void testRPCDefaults() throws SQLException {
+ assumeTrue(VersionInfo.compareVersion(VersionInfo.getVersion(), "2.5.0") >= 0);
+ try {
+ Configuration config =
+ HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ config.set("hbase.client.registry.impl",
+ "org.apache.hadoop.hbase.client.RpcConnectionRegistry");
+ ConnectionInfo.create("jdbc:phoenix+rpc", config, null, null);
+ fail("Should have thrown exception");
+ } catch (SQLException e) {
+ }
+
+ Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ config.set("hbase.client.registry.impl",
+ "org.apache.hadoop.hbase.client.RpcConnectionRegistry");
+ config.set("hbase.client.bootstrap.servers", "bootstrap1\\:123,boostrap2\\:234");
+ RPCConnectionInfo info =
+ (RPCConnectionInfo) ConnectionInfo.create("jdbc:phoenix+rpc", config, null, null);
+ assertEquals(info.getBoostrapServers(), "bootstrap1\\:123,boostrap2\\:234");
+
+ config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ config.set("hbase.client.registry.impl",
+ "org.apache.hadoop.hbase.client.RpcConnectionRegistry");
+ info =
+ (RPCConnectionInfo) ConnectionInfo.create(
+ "jdbc:phoenix+rpc:bootstrap1.from.url,bootstrap2.from.url", config, null, null);
+ // TODO looks like HBase doesn't do port replacement/check for RPC servers either ?
+ assertEquals(info.getBoostrapServers(), "bootstrap1.from.url,bootstrap2.from.url");
+
+ config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ config.set("hbase.client.registry.impl",
+ "org.apache.hadoop.hbase.client.RpcConnectionRegistry");
+ info =
+ (RPCConnectionInfo) ConnectionInfo.create(
+ "jdbc:phoenix+rpc:bootstrap1.from.url\\:123,bootstrap2.from.url\\::234", config,
+ null, null);
+ // TODO looks like HBase doesn't do port replacement/check for RPC servers either ?
+ assertEquals(info.getBoostrapServers(), "bootstrap1.from.url:123,bootstrap2.from.url:234");
+
+ // Check fallback to master properties
+ config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ config.set("hbase.client.registry.impl",
+ "org.apache.hadoop.hbase.client.RpcConnectionRegistry");
+ config.set("hbase.masters", "master1:123,master2:234,master3:345");
+ info = (RPCConnectionInfo) ConnectionInfo.create("jdbc:phoenix+rpc", config, null, null);
+ // TODO looks like HBase doesn't do port replacement/check for RPC servers either ?
+ assertEquals(info.getBoostrapServers(), "master1:123,master2:234,master3:345");
+ }
+
@Test
public void testNotAccept() throws Exception {
Driver driver = new PhoenixDriver();
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
index 8146368304..368a9c52a4 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.ConnectionlessQueryServicesImpl;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesTestImpl;
+import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -88,16 +89,15 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver {
}
@Override // public for testing
- public synchronized ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException {
+ public synchronized ConnectionQueryServices getConnectionQueryServices(String url, Properties infoIn) throws SQLException {
checkClosed();
- ConnectionInfo connInfo = ConnectionInfo.create(url);
+ final Properties info = PropertiesUtil.deepCopy(infoIn);
+ ConnectionInfo connInfo = ConnectionInfo.create(url, null, info);
ConnectionQueryServices connectionQueryServices = connectionQueryServicesMap.get(connInfo);
if (connectionQueryServices != null) {
return connectionQueryServices;
}
- if (info == null) {
- info = new Properties();
- }
+ info.putAll(connInfo.asProps().asMap());
if (connInfo.isConnectionless()) {
connectionQueryServices = new ConnectionlessQueryServicesImpl(queryServices, connInfo, info);
} else {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java
index 7dab05f9e2..b3677b27d2 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java
@@ -18,7 +18,9 @@
package org.apache.phoenix.mapreduce;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.mapred.JobContext;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.junit.Test;
import org.mockito.Mockito;
@@ -40,6 +42,7 @@ public class PhoenixMultiViewInputFormatTest {
Configuration config = new Configuration();
config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, "10");
+ config.set(HConstants.ZOOKEEPER_QUORUM, PhoenixRuntime.CONNECTIONLESS);
JobContext mockContext = Mockito.mock(JobContext.class);
when(mockContext.getConfiguration()).thenReturn(config);
@@ -55,6 +58,7 @@ public class PhoenixMultiViewInputFormatTest {
Configuration config = new Configuration();
config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, "10");
config.set(MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ, "dummy.path");
+ config.set(HConstants.ZOOKEEPER_QUORUM, PhoenixRuntime.CONNECTIONLESS);
JobContext mockContext = Mockito.mock(JobContext.class);
when(mockContext.getConfiguration()).thenReturn(config);
@@ -73,6 +77,7 @@ public class PhoenixMultiViewInputFormatTest {
Configuration config = new Configuration();
config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, "10");
config.set(MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ, "dummy.path");
+ config.set(HConstants.ZOOKEEPER_QUORUM, PhoenixRuntime.CONNECTIONLESS);
JobContext mockContext = Mockito.mock(JobContext.class);
when(mockContext.getConfiguration()).thenReturn(config);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
index c316fa45c0..b678514c64 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
@@ -17,6 +17,9 @@
*/
package org.apache.phoenix.mapreduce.util;
+import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
import static org.junit.Assert.assertEquals;
import java.sql.Connection;
@@ -41,7 +44,12 @@ import org.junit.Test;
public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
private static final String ORIGINAL_CLUSTER_QUORUM = "myzookeeperhost";
private static final String OVERRIDE_CLUSTER_QUORUM = "myoverridezookeeperhost";
-
+
+ // This is a hack that relies on the way The URL is re-constructed from Configuration to
+ // generate a Test connection for the MR jobs
+ protected static String TEST_ZK_QUORUM =
+ CONNECTIONLESS + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM + JDBC_PROTOCOL_TERMINATOR;
+
@Test
/**
* This test reproduces the bug filed in PHOENIX-2310.
@@ -68,7 +76,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
" AS SELECT * FROM " + tableName + "\n";
conn.createStatement().execute(viewDdl);
final Configuration configuration = new Configuration ();
- configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+ configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM);
PhoenixConfigurationUtil.setOutputTableName(configuration, viewName);
PhoenixConfigurationUtil.setPhysicalTableName(configuration, viewName);
PhoenixConfigurationUtil.setUpsertColumnNames(configuration, new String[] {"A_STRING", "A_BINARY", "COL1"});
@@ -97,7 +105,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
" CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
conn.createStatement().execute(ddl);
final Configuration configuration = new Configuration ();
- configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+ configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM);
PhoenixConfigurationUtil.setOutputTableName(configuration, tableName);
PhoenixConfigurationUtil.setPhysicalTableName(configuration, tableName);
PhoenixConfigurationUtil.setUpsertColumnNames(configuration, new String[] {"A_STRING", "A_BINARY", "COL1"});
@@ -124,7 +132,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
" CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
conn.createStatement().execute(ddl);
final Configuration configuration = new Configuration ();
- configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+ configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM);
PhoenixConfigurationUtil.setOutputTableName(configuration, tableName);
PhoenixConfigurationUtil.setPhysicalTableName(configuration, tableName);
final String upserStatement = PhoenixConfigurationUtil.getUpsertStatement(configuration);
@@ -145,7 +153,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
" CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
conn.createStatement().execute(ddl);
final Configuration configuration = new Configuration ();
- configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+ configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM);
PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
final String expectedSelectStatement = "SELECT \"A_STRING\" , \"A_BINARY\" , \"0\".\"COL1\" FROM " + tableName ;
@@ -167,7 +175,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
" CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
conn.createStatement().execute(ddl);
final Configuration configuration = new Configuration ();
- configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+ configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM);
PhoenixConfigurationUtil.setInputTableName(configuration, fullTableName);
final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
final String expectedSelectStatement = "SELECT \"A_STRING\" , \"A_BINARY\" , \"0\".\"COL1\" FROM " + fullTableName;
@@ -187,7 +195,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
" CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
conn.createStatement().execute(ddl);
final Configuration configuration = new Configuration ();
- configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+ configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM);
PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
PhoenixConfigurationUtil.setSelectColumnNames(configuration, new String[]{"A_BINARY"});
final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
@@ -207,7 +215,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
" (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[])\n";
conn.createStatement().execute(ddl);
final Configuration configuration = new Configuration ();
- configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+ configuration.set(HConstants.ZOOKEEPER_QUORUM, TEST_ZK_QUORUM);
PhoenixConfigurationUtil.setSelectColumnNames(configuration,new String[]{"ID","VCARRAY"});
PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/transaction/OmidTransactionService.java b/phoenix-core/src/test/java/org/apache/phoenix/transaction/OmidTransactionService.java
index 64e721c04f..02531faf77 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/transaction/OmidTransactionService.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/transaction/OmidTransactionService.java
@@ -35,7 +35,7 @@ import org.apache.omid.tso.client.OmidClientConfiguration;
import org.apache.omid.tso.client.TSOClient;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.jdbc.ConnectionInfo;
import com.google.inject.Guice;
import com.google.inject.Injector;
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/transaction/TransactionServiceManager.java b/phoenix-core/src/test/java/org/apache/phoenix/transaction/TransactionServiceManager.java
index f3c1bb9ce2..ebb892101e 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/transaction/TransactionServiceManager.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/transaction/TransactionServiceManager.java
@@ -20,7 +20,7 @@ package org.apache.phoenix.transaction;
import java.sql.SQLException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.jdbc.ConnectionInfo;
import org.apache.phoenix.transaction.TransactionFactory.Provider;
public class TransactionServiceManager {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java
index 73eb637113..11add1ffcb 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java
@@ -144,7 +144,7 @@ public class JDBCUtilTest {
@Test
public void formatZookeeperUrlSameOrderTest() {
- String zk1 = "zk1.net,zk2.net,zk3.net:2181:/hbase";
+ String zk1 = "zk1.net\\:2181,zk2.net\\:2181,zk3.net\\:2181::/hbase";
String result = JDBCUtil.formatZookeeperUrl(zk1);
assertEquals(zk1,result);
}
@@ -154,21 +154,21 @@ public class JDBCUtilTest {
public void formatZookeeperUrlDifferentOrderTest() {
String zk1 = "zk3.net,zk2.net,zk1.net:2181:/hbase";
String result = JDBCUtil.formatZookeeperUrl(zk1);
- assertEquals("zk1.net,zk2.net,zk3.net:2181:/hbase",result);
+ assertEquals("zk1.net\\:2181,zk2.net\\:2181,zk3.net\\:2181::/hbase",result);
}
@Test
public void formatZookeeperUrlNoTrailersTest() {
String zk1 = "zk1.net,zk2.net,zk3.net";
String result = JDBCUtil.formatZookeeperUrl(zk1);
- assertEquals(zk1,result);
+ assertEquals("zk1.net\\:2181,zk2.net\\:2181,zk3.net\\:2181::/hbase",result);
}
@Test
public void formatZookeeperUrlToLowercaseTest() {
String zk1 = "MYHOST1.NET,MYHOST2.NET";
String result = JDBCUtil.formatZookeeperUrl(zk1);
- assertEquals(zk1.toLowerCase(),result);
+ assertEquals("myhost1.net\\:2181,myhost2.net\\:2181::/hbase",result);
}
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
index 63dd88581f..0719dc5a61 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.sql.Types;
+import java.util.Arrays;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
@@ -141,18 +142,25 @@ public class QueryUtilTest {
private void validateUrl(String url) {
String prefix = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
- assertTrue("JDBC URL missing jdbc protocol prefix", url.startsWith(prefix));
+ String zkPrefix = PhoenixRuntime.JDBC_PROTOCOL_ZK+ PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+ String masterPrefix = PhoenixRuntime.JDBC_PROTOCOL_MASTER + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+ String rpcPrefix = PhoenixRuntime.JDBC_PROTOCOL_RPC + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+
+ assertTrue("JDBC URL missing jdbc protocol prefix",
+ url.startsWith(prefix) || url.startsWith(zkPrefix) || url.startsWith(masterPrefix)
+ || url.startsWith(rpcPrefix));
assertTrue("JDBC URL missing jdbc terminator suffix", url.endsWith(";"));
+ url = url.replaceAll("\\\\:", "=");
// remove the prefix, should only be left with server[,server...]:port:/znode
- url = url.substring(prefix.length());
String[] splits = url.split(":");
+ splits = Arrays.copyOfRange(splits, 2, splits.length);
assertTrue("zk details should contain at least server component", splits.length >= 1);
// make sure that each server is comma separated
- String[] servers = splits[0].split(",");
+ String[] servers = splits[0].replaceAll("=", "\\\\:").split(",");
for(String server: servers){
assertFalse("Found whitespace in server names for url: " + url, server.contains(" "));
}
- if (splits.length >= 2) {
+ if (splits.length >= 2 && !splits[1].isEmpty()) {
// second bit is a port number, should not through
try {
Integer.parseInt(splits[1]);