You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by tk...@apache.org on 2023/07/24 20:20:46 UTC
[phoenix] branch master updated: PHOENIX-6995 HA client connections ignore additional jdbc params in t… (#1643)
This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 222dd70ab5 PHOENIX-6995 HA client connections ignore additional jdbc params in t… (#1643)
222dd70ab5 is described below
commit 222dd70ab5da0a090b50533b2cac7e92bf723edc
Author: tkhurana <kh...@gmail.com>
AuthorDate: Mon Jul 24 13:20:41 2023 -0700
PHOENIX-6995 HA client connections ignore additional jdbc params in t… (#1643)
* PHOENIX-6995 HA client connections ignore additional jdbc params in the jdbc string
* Address review comments and fix checkstyle issues
---------
Co-authored-by: Tanuj Khurana <tk...@apache.org>
---
.../jdbc/ClusterRoleRecordGeneratorToolIT.java | 3 ++
.../phoenix/jdbc/FailoverPhoenixConnectionIT.java | 26 ++++++++-----
.../phoenix/jdbc/HighAvailabilityGroupIT.java | 10 ++---
.../jdbc/HighAvailabilityTestingUtility.java | 24 +++++++++---
.../jdbc/HighAvailabilityTestingUtilityIT.java | 2 +-
.../jdbc/ParallelPhoenixConnectionFallbackIT.java | 2 +-
.../phoenix/jdbc/ParallelPhoenixConnectionIT.java | 37 ++++++++++++++++--
.../jdbc/ParallelPhoenixConnectionWorkflowIT.java | 3 +-
.../ParallelPhoenixNullComparingResultSetIT.java | 4 +-
.../apache/phoenix/jdbc/HighAvailabilityGroup.java | 44 ++++++++++++++++++++--
.../phoenix/jdbc/HighAvailabilityPolicy.java | 7 ++--
.../java/org/apache/phoenix/util/JDBCUtil.java | 9 ++++-
12 files changed, 135 insertions(+), 36 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorToolIT.java
index 25cc237424..0cc5048bcc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorToolIT.java
@@ -30,12 +30,14 @@ import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +47,7 @@ import org.slf4j.LoggerFactory;
*
* @see ClusterRoleRecordGeneratorToolTest
*/
+@Category(NeedsOwnMiniClusterTest.class)
public class ClusterRoleRecordGeneratorToolIT {
private static final Logger LOG = LoggerFactory.getLogger(ClusterRoleRecordGeneratorToolIT.class);
private static final HBaseTestingUtilityPair CLUSTERS = new HBaseTestingUtilityPair();
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java
index f2350697d2..774b8947d8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java
@@ -63,7 +63,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,8 +108,8 @@ public class FailoverPhoenixConnectionIT {
// Make first cluster ACTIVE
CLUSTERS.initClusterRole(haGroupName, HighAvailabilityPolicy.FAILOVER);
- haGroup = getHighAvailibilityGroup(CLUSTERS.getJdbcUrl(), clientProperties);
- LOG.info("Initialized haGroup {} with URL {}", haGroup, CLUSTERS.getJdbcUrl());
+ haGroup = getHighAvailibilityGroup(CLUSTERS.getJdbcHAUrl(), clientProperties);
+ LOG.info("Initialized haGroup {} with URL {}", haGroup, CLUSTERS.getJdbcHAUrl());
tableName = testName.getMethodName().toUpperCase();
CLUSTERS.createTableOnClusterPair(tableName);
}
@@ -120,10 +119,10 @@ public class FailoverPhoenixConnectionIT {
try {
haGroup.close();
PhoenixDriver.INSTANCE
- .getConnectionQueryServices(CLUSTERS.getUrl1(), haGroup.getProperties())
+ .getConnectionQueryServices(CLUSTERS.getJdbcUrl1(), haGroup.getProperties())
.close();
PhoenixDriver.INSTANCE
- .getConnectionQueryServices(CLUSTERS.getUrl2(), haGroup.getProperties())
+ .getConnectionQueryServices(CLUSTERS.getJdbcUrl2(), haGroup.getProperties())
.close();
} catch (Exception e) {
LOG.error("Fail to tear down the HA group and the CQS. Will ignore", e);
@@ -253,9 +252,18 @@ public class FailoverPhoenixConnectionIT {
*/
@Test(timeout = 300000)
public void testConnectionWhenActiveZKRestarts() throws Exception {
+ // This creates the cqsi for the active cluster upfront.
+ // If we don't do that then later when we try to transition
+ // the cluster role it tries to create cqsi for the cluster
+ // which is down and that takes forever causing timeouts
+ try (Connection conn = createFailoverConnection()) {
+ doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
+ }
doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> {
try {
- createFailoverConnection();
+ try (Connection conn = createFailoverConnection()) {
+ doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
+ }
fail("Should have failed since ACTIVE ZK cluster was shutdown");
} catch (SQLException e) {
LOG.info("Got expected exception when ACTIVE ZK cluster is down", e);
@@ -450,7 +458,7 @@ public class FailoverPhoenixConnectionIT {
CLUSTERS.initClusterRole(haGroupName2, HighAvailabilityPolicy.FAILOVER);
Properties clientProperties2 = new Properties(clientProperties);
clientProperties2.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2);
- Connection conn2 = DriverManager.getConnection(CLUSTERS.getJdbcUrl(), clientProperties2);
+ Connection conn2 = DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties2);
PhoenixConnection wrappedConn2 = ((FailoverPhoenixConnection) conn2).getWrappedConnection();
assertFalse(wrappedConn.isClosed());
@@ -491,7 +499,7 @@ public class FailoverPhoenixConnectionIT {
return null;
}).when(spy).close();
ConnectionQueryServices cqs = PhoenixDriver.INSTANCE
- .getConnectionQueryServices(CLUSTERS.getUrl1(), clientProperties);
+ .getConnectionQueryServices(CLUSTERS.getJdbcUrl1(), clientProperties);
// replace the wrapped connection with the spied connection in CQS
cqs.removeConnection(wrapped.unwrap(PhoenixConnection.class));
cqs.addConnection(spy.unwrap(PhoenixConnection.class));
@@ -836,7 +844,7 @@ public class FailoverPhoenixConnectionIT {
* Create a failover connection using {@link #clientProperties}.
*/
private Connection createFailoverConnection() throws SQLException {
- return DriverManager.getConnection(CLUSTERS.getJdbcUrl(), clientProperties);
+ return DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties);
}
@FunctionalInterface
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java
index 55813980a3..4435236625 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java
@@ -105,7 +105,7 @@ public class HighAvailabilityGroupIT {
// Make first cluster ACTIVE
CLUSTERS.initClusterRole(haGroupName, HighAvailabilityPolicy.FAILOVER);
- jdbcUrl = CLUSTERS.getJdbcUrl();
+ jdbcUrl = CLUSTERS.getJdbcHAUrl();
haGroup = getHighAvailibilityGroup(jdbcUrl,clientProperties);
}
@@ -114,10 +114,10 @@ public class HighAvailabilityGroupIT {
haGroup.close();
try {
PhoenixDriver.INSTANCE
- .getConnectionQueryServices(CLUSTERS.getUrl1(), haGroup.getProperties())
+ .getConnectionQueryServices(CLUSTERS.getJdbcUrl1(), haGroup.getProperties())
.close();
PhoenixDriver.INSTANCE
- .getConnectionQueryServices(CLUSTERS.getUrl2(), haGroup.getProperties())
+ .getConnectionQueryServices(CLUSTERS.getJdbcUrl2(), haGroup.getProperties())
.close();
} catch (Exception e) {
LOG.error("Fail to tear down the HA group and the CQS. Will ignore", e);
@@ -302,7 +302,7 @@ public class HighAvailabilityGroupIT {
*/
@Test
public void testGetShouldFailWithNonHAJdbcString() {
- final String oldJdbcString = String.format("jdbc:phoenix:%s", CLUSTERS.getUrl1());
+ final String oldJdbcString = CLUSTERS.getJdbcUrl1();
try {
HighAvailabilityGroup.get(oldJdbcString, clientProperties);
fail("Should have failed with invalid connection string '" + oldJdbcString + "'");
@@ -362,7 +362,7 @@ public class HighAvailabilityGroupIT {
CLUSTERS.createTableOnClusterPair(tableName);
final String url1 = CLUSTERS.getUrl1();
- final String jdbcUrlToCluster1 = "jdbc:phoenix:" + url1;
+ final String jdbcUrlToCluster1 = CLUSTERS.getJdbcUrl1();
doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> {
try {
DriverManager.getConnection(jdbcUrlToCluster1);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
index f3de566792..d604774e50 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.jdbc;
import org.apache.hadoop.hbase.StartMiniClusterOption;
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.commons.lang3.RandomUtils;
import org.apache.curator.framework.CuratorFramework;
@@ -85,6 +86,8 @@ public class HighAvailabilityTestingUtility {
private PhoenixHAAdminHelper haAdmin2;
private Admin admin1;
private Admin admin2;
+ @VisibleForTesting
+ static final String PRINCIPAL = "USER_FOO";
public HBaseTestingUtilityPair() {
Configuration conf1 = hbaseCluster1.getConfiguration();
@@ -243,7 +246,7 @@ public class HighAvailabilityTestingUtility {
public Connection getClusterConnection(int clusterIndex) throws SQLException {
String clusterUrl = clusterIndex == 1 ? getUrl1() : getUrl2();
Properties props = new Properties();
- String url = String.format("jdbc:phoenix:%s", clusterUrl);
+ String url = getJdbcUrl(clusterUrl);
return DriverManager.getConnection(url, props);
}
@@ -339,8 +342,19 @@ public class HighAvailabilityTestingUtility {
/**
* @return the JDBC connection URL for this pair of HBase cluster in the HA format
*/
- public String getJdbcUrl() {
- return String.format("jdbc:phoenix:[%s|%s]", url1, url2);
+ public String getJdbcHAUrl() {
+ return getJdbcUrl(String.format("[%s|%s]", url1, url2));
+ }
+
+ public String getJdbcUrl1() {
+ return getJdbcUrl(url1);
+ }
+ public String getJdbcUrl2() {
+ return getJdbcUrl(url2);
+ }
+
+ public String getJdbcUrl(String zkUrl) {
+ return String.format("jdbc:phoenix:%s:%s", zkUrl, PRINCIPAL);
}
public String getUrl1() {
@@ -395,7 +409,7 @@ public class HighAvailabilityTestingUtility {
public void createTableOnClusterPair(String tableName, boolean replicationScope)
throws SQLException {
for (String url : Arrays.asList(getUrl1(), getUrl2())) {
- String jdbcUrl = String.format("jdbc:phoenix:%s", url);
+ String jdbcUrl = getJdbcUrl(url);
try (Connection conn = DriverManager.getConnection(jdbcUrl, new Properties())) {
conn.createStatement().execute(String.format(
"CREATE TABLE IF NOT EXISTS %s (\n"
@@ -426,7 +440,7 @@ public class HighAvailabilityTestingUtility {
*/
public void createTenantSpecificTable(String tableName) throws SQLException {
for (String url : Arrays.asList(getUrl1(), getUrl2())) {
- String jdbcUrl = String.format("jdbc:phoenix:%s", url);
+ String jdbcUrl = getJdbcUrl(url);
try (Connection conn = DriverManager.getConnection(jdbcUrl, new Properties())) {
conn.createStatement().execute(String.format(
"CREATE TABLE IF NOT EXISTS %s (\n"
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtilityIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtilityIT.java
index a2ad26384f..f7427050ea 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtilityIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtilityIT.java
@@ -131,7 +131,7 @@ public class HighAvailabilityTestingUtilityIT {
Properties properties = HighAvailabilityTestingUtility.getHATestProperties();
properties.setProperty(PHOENIX_HA_GROUP_ATTR, testName.getMethodName());
ConnectionQueryServices cqs = PhoenixDriver.INSTANCE.getConnectionQueryServices(
- CLUSTERS.getUrl1(), properties);
+ CLUSTERS.getJdbcUrl1(), properties);
fail("Should have failed since the target cluster is down, but got a CQS: " + cqs);
} catch (Exception e) {
LOG.info("Got expected exception since target cluster is down:", e);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFallbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFallbackIT.java
index e081b47d16..e59fc18fd8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFallbackIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFallbackIT.java
@@ -69,7 +69,7 @@ public class ParallelPhoenixConnectionFallbackIT {
// Make first cluster ACTIVE
CLUSTERS.initClusterRole(haGroupName, PARALLEL);
- jdbcUrl = String.format("jdbc:phoenix:[%s|%s]", CLUSTERS.getUrl1(), CLUSTERS.getUrl2());
+ jdbcUrl = CLUSTERS.getJdbcHAUrl();
haGroup = HighAvailabilityTestingUtility.getHighAvailibilityGroup(jdbcUrl, PROPERTIES);
LOG.info("Initialized haGroup {} with URL {}", haGroup.getGroupInfo().getName(), jdbcUrl);
CLUSTERS.createTableOnClusterPair(tableName);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
index 19b4cd5c20..14127f111e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
@@ -58,9 +58,11 @@ import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
import org.apache.phoenix.log.LogLevel;
import org.apache.phoenix.monitoring.GlobalMetric;
import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PhoenixRuntime;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -114,8 +116,8 @@ public class ParallelPhoenixConnectionIT {
// Make first cluster ACTIVE
CLUSTERS.initClusterRole(haGroupName, PARALLEL);
- haGroup = HighAvailabilityTestingUtility.getHighAvailibilityGroup(CLUSTERS.getJdbcUrl(), clientProperties);
- LOG.info("Initialized haGroup {} with URL {}", haGroup, CLUSTERS.getJdbcUrl());
+ haGroup = HighAvailabilityTestingUtility.getHighAvailibilityGroup(CLUSTERS.getJdbcHAUrl(), clientProperties);
+ LOG.info("Initialized haGroup {} with URL {}", haGroup, CLUSTERS.getJdbcHAUrl());
tableName = testName.getMethodName();
CLUSTERS.createTableOnClusterPair(tableName);
}
@@ -130,6 +132,35 @@ public class ParallelPhoenixConnectionIT {
}
}
+ @Test
+ public void testUserPrincipal() throws Exception {
+ try (Connection conn = getParallelConnection()) {
+ ParallelPhoenixConnection pr = conn.unwrap(ParallelPhoenixConnection.class);
+ ParallelPhoenixContext context = pr.getContext();
+ HighAvailabilityGroup.HAGroupInfo group = context.getHaGroup().getGroupInfo();
+ if (CLUSTERS.getUrl1().compareTo(CLUSTERS.getUrl2()) <= 0) {
+ Assert.assertEquals(CLUSTERS.getJdbcUrl1(), group.getJDBCUrl1());
+ Assert.assertEquals(CLUSTERS.getJdbcUrl2(), group.getJDBCUrl2());
+ } else {
+ Assert.assertEquals(CLUSTERS.getJdbcUrl2(), group.getJDBCUrl1());
+ Assert.assertEquals(CLUSTERS.getJdbcUrl1(), group.getJDBCUrl2());
+ }
+ ConnectionQueryServices cqsi;
+ // verify connection#1
+ cqsi = PhoenixDriver.INSTANCE.getConnectionQueryServices(group.getJDBCUrl1(), clientProperties);
+ Assert.assertEquals(HBaseTestingUtilityPair.PRINCIPAL, cqsi.getUserName());
+ PhoenixConnection pConn = pr.getFutureConnection1().get();
+ ConnectionQueryServices cqsiFromConn = pConn.getQueryServices();
+ Assert.assertTrue(cqsi == cqsiFromConn);
+ // verify connection#2
+ cqsi = PhoenixDriver.INSTANCE.getConnectionQueryServices(group.getJDBCUrl2(), clientProperties);
+ Assert.assertEquals(HBaseTestingUtilityPair.PRINCIPAL, cqsi.getUserName());
+ pConn = pr.getFutureConnection2().get();
+ cqsiFromConn = pConn.getQueryServices();
+ Assert.assertTrue(cqsi == cqsiFromConn);
+ }
+ }
+
/**
* Test Phoenix connection creation and basic operations with HBase cluster(s) unavailable.
*/
@@ -513,7 +544,7 @@ public class ParallelPhoenixConnectionIT {
* @throws SQLException
*/
private Connection getParallelConnection() throws SQLException {
- return DriverManager.getConnection(CLUSTERS.getJdbcUrl(), clientProperties);
+ return DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties);
}
void waitForCompletion(Connection conn) throws Exception {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionWorkflowIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionWorkflowIT.java
index b0c6dc2f75..274aaf32ac 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionWorkflowIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionWorkflowIT.java
@@ -200,8 +200,7 @@ public class ParallelPhoenixConnectionWorkflowIT {
// Make first cluster ACTIVE
CLUSTERS.initClusterRole(haGroupName, PARALLEL);
- jdbcUrl = String.format("jdbc:phoenix:[%s|%s]",
- CLUSTERS.getUrl1(), CLUSTERS.getUrl2());
+ jdbcUrl = CLUSTERS.getJdbcHAUrl();
haGroup = HighAvailabilityTestingUtility.getHighAvailibilityGroup(jdbcUrl, clientProperties);
LOG.info("Initialized haGroup {} with URL {}", haGroup.getGroupInfo().getName(), jdbcUrl);
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixNullComparingResultSetIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixNullComparingResultSetIT.java
index a3d89cdd97..271db0232f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixNullComparingResultSetIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixNullComparingResultSetIT.java
@@ -81,7 +81,7 @@ public class ParallelPhoenixNullComparingResultSetIT {
// Make first cluster ACTIVE
CLUSTERS.initClusterRole(haGroupName, PARALLEL);
- jdbcUrl = CLUSTERS.getJdbcUrl();
+ jdbcUrl = CLUSTERS.getJdbcHAUrl();
haGroup = HighAvailabilityTestingUtility.getHighAvailibilityGroup(jdbcUrl, PROPERTIES);
LOG.info("Initialized haGroup {} with URL {}", haGroup.getGroupInfo().getName(), jdbcUrl);
CLUSTERS.createTableOnClusterPair(tableName, false);
@@ -164,7 +164,7 @@ public class ParallelPhoenixNullComparingResultSetIT {
}
private void addRowToCluster(String url, String tableName, int id, int v) throws SQLException {
- String jdbcUrl = String.format("jdbc:phoenix:%s", url);
+ String jdbcUrl = CLUSTERS.getJdbcUrl(url);
try (Connection conn = DriverManager.getConnection(jdbcUrl, PROPERTIES)) {
Statement stmt = conn.createStatement();
stmt.executeUpdate(String.format("UPSERT INTO %s VALUES(%d, %d)", tableName, id, v));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
index 9a0eefd547..75ac6cba82 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
@@ -33,6 +33,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -206,6 +207,14 @@ public class HighAvailabilityGroup {
.build()
.buildException();
}
+ String additionalJDBCParams = null;
+ int idx = url.indexOf("]");
+ int extraIdx = url.indexOf(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR, idx + 1);
+ if (extraIdx != -1) {
+ // skip the JDBC_PROTOCOL_SEPARATOR
+ additionalJDBCParams = url.substring(extraIdx + 1);
+ }
+
url = url.substring(url.indexOf("[") + 1, url.indexOf("]"));
String[] urls = url.split("\\|");
@@ -216,7 +225,7 @@ public class HighAvailabilityGroup {
.build()
.buildException();
}
- return new HAGroupInfo(name, urls[0], urls[1]);
+ return new HAGroupInfo(name, urls[0], urls[1], additionalJDBCParams);
}
/**
@@ -578,7 +587,8 @@ public class HighAvailabilityGroup {
}
Preconditions.checkArgument(url.equals(info.getUrl1()) || url.equals(info.getUrl2()),
"The URL '" + url + "' does not belong to this HA group " + info);
- String jdbcString = String.format("jdbc:phoenix:%s", url);
+
+ String jdbcString = info.getJDBCUrl(url);
ClusterRole role = roleRecord.getRole(url);
if (!role.canConnect()) {
@@ -741,8 +751,9 @@ public class HighAvailabilityGroup {
static final class HAGroupInfo {
private final String name;
private final PairOfSameType<String> urls;
+ private final String additionalJDBCParams;
- HAGroupInfo(String name, String url1, String url2) {
+ HAGroupInfo(String name, String url1, String url2, String additionalJDBCParams) {
Preconditions.checkNotNull(name);
Preconditions.checkNotNull(url1);
Preconditions.checkNotNull(url2);
@@ -756,6 +767,11 @@ public class HighAvailabilityGroup {
} else {
this.urls = new PairOfSameType<>(url1, url2);
}
+ this.additionalJDBCParams = additionalJDBCParams;
+ }
+
+ HAGroupInfo(String name, String url1, String url2) {
+ this(name, url1, url2, null);
}
public String getName() {
@@ -770,6 +786,28 @@ public class HighAvailabilityGroup {
return urls.getSecond();
}
+ public String getJDBCUrl(String zkUrl) {
+ Preconditions.checkArgument(zkUrl.equals(getUrl1()) || zkUrl.equals(getUrl2()),
+ "The URL '" + zkUrl + "' does not belong to this HA group " + this);
+ StringBuilder sb = new StringBuilder();
+ sb.append(PhoenixRuntime.JDBC_PROTOCOL);
+ sb.append(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR);
+ sb.append(zkUrl);
+ if (!Strings.isNullOrEmpty(additionalJDBCParams)) {
+ sb.append(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR);
+ sb.append(additionalJDBCParams);
+ }
+ return sb.toString();
+ }
+
+ public String getJDBCUrl1() {
+ return getJDBCUrl(getUrl1());
+ }
+
+ public String getJDBCUrl2() {
+ return getJDBCUrl(getUrl2());
+ }
+
/**
* Helper method to return the znode path in the Phoenix HA namespace.
*/
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java
index 287bc9e279..93d2661864 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java
@@ -66,8 +66,8 @@ enum HighAvailabilityPolicy {
zkUrl, haGroup.getGroupInfo());
ConnectionQueryServices cqs = null;
try {
- cqs = PhoenixDriver.INSTANCE.getConnectionQueryServices(zkUrl,
- haGroup.getProperties());
+ cqs = PhoenixDriver.INSTANCE.getConnectionQueryServices(
+ haGroup.getGroupInfo().getJDBCUrl(zkUrl), haGroup.getProperties());
cqs.closeAllConnections(new SQLExceptionInfo
.Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER)
.setMessage("Phoenix connection got closed due to failover")
@@ -88,7 +88,8 @@ enum HighAvailabilityPolicy {
throws SQLException {
// Invalidate CQS cache if any that has been closed but has not been cleared
LOG.info("invalidating cqs cache for zkUrl: " + zkUrl);
- PhoenixDriver.INSTANCE.invalidateCache(zkUrl, haGroup.getProperties());
+ PhoenixDriver.INSTANCE.invalidateCache(haGroup.getGroupInfo().getJDBCUrl(zkUrl),
+ haGroup.getProperties());
}
},
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
index 28c8e9e2b5..848154f740 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
@@ -189,7 +189,7 @@ public class JDBCUtil {
public static Consistency getConsistencyLevel(String url, Properties info, String defaultValue) {
String consistency = findProperty(url, info, PhoenixRuntime.CONSISTENCY_ATTRIB);
- if(consistency != null && consistency.equalsIgnoreCase(Consistency.TIMELINE.toString())){
+ if (consistency != null && consistency.equalsIgnoreCase(Consistency.TIMELINE.toString())){
return Consistency.TIMELINE;
}
@@ -209,6 +209,8 @@ public class JDBCUtil {
/**
* Formats a zkUrl which includes the zkQuroum of the jdbc url and the rest to sort the zk quorum hosts.
* Example input zkUrl "zk1.net,zk2.net,zk3.net:2181:/hbase"
+ * Example input zkUrl "zk1.net,zk2.net,zk3.net:2181:/hbase:user_foo"
+ * Returns: zk1.net,zk2.net,zk3.net:2181:/hbase
*/
//TODO: Adjust for non-zkurl
public static String formatZookeeperUrl(String zkUrl){
@@ -219,7 +221,10 @@ public class JDBCUtil {
Preconditions.checkArgument(hosts.length > 0,"Unexpected zk url format no hosts found.");
String hostsStrings = Arrays.stream(hosts).sorted().collect(Collectors.joining(","));
components[0] = hostsStrings;
- return Arrays.stream(components).collect(Collectors.joining(":"));
+ // host:port:path:principal
+ // additional arguments passed in url, strip them out
+ int endIdx = Integer.min(components.length, 3);
+ return Arrays.stream(components, 0, endIdx).collect(Collectors.joining(":"));
}
}