You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vj...@apache.org on 2021/07/29 15:50:44 UTC
[phoenix] branch 4.x updated: PHOENIX-6506 : Tenant Connection is
not able to access/validate Global Sequences (#1270) (#1261)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new 2bdd4a2 PHOENIX-6506 : Tenant Connection is not able to access/validate Global Sequences (#1270) (#1261)
2bdd4a2 is described below
commit 2bdd4a2bb4c58e91a1293bd4afc09f6f33ee8542
Author: Lokesh Khurana <kh...@gmail.com>
AuthorDate: Thu Jul 29 21:20:33 2021 +0530
PHOENIX-6506 : Tenant Connection is not able to access/validate Global Sequences (#1270) (#1261)
Signed-off-by: Viraj Jasani <vj...@apache.org>
---
.../org/apache/phoenix/end2end/UpsertValuesIT.java | 148 ++++++++++++++++++++-
.../phoenix/query/ConnectionQueryServicesImpl.java | 40 +++++-
2 files changed, 186 insertions(+), 2 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
index e89cc58..32662ec 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
@@ -44,11 +44,15 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.SequenceNotFoundException;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.Assert;
import org.junit.Test;
@@ -670,5 +674,147 @@ public class UpsertValuesIT extends ParallelStatsDisabledIT {
assertTrue(next.containsColumn(Bytes.toBytes("CF2"), PInteger.INSTANCE.toBytes(3)));
}
}
-
+
+
+ @Test
+ public void testUpsertValueWithDiffSequenceAndConnections() throws Exception {
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(true);
+ PreparedStatement createTableStatement = conn.prepareStatement(String.format("CREATE TABLE IF NOT EXISTS " +
+ "%s (SERVICE VARCHAR NOT NULL, SEQUENCE_NUMBER BIGINT NOT NULL , " +
+ "CONSTRAINT PK PRIMARY KEY (SERVICE, SEQUENCE_NUMBER)) MULTI_TENANT = TRUE", tableName));
+ createTableStatement.execute();
+ }
+
+ testGlobalSequenceUpsertWithTenantConnection(tableName);
+ testGlobalSequenceUpsertWithGlobalConnection(tableName);
+ testTenantSequenceUpsertWithSameTenantConnection(tableName);
+ testTenantSequenceUpsertWithDifferentTenantConnection(tableName);
+ testTenantSequenceUpsertWithGlobalConnection(tableName);
+
+ }
+
+ private void testTenantSequenceUpsertWithGlobalConnection(String tableName) throws Exception {
+ String sequenceName = generateUniqueSequenceName();
+
+ try (Connection conn = getTenantConnection("PHOENIX")) {
+ conn.setAutoCommit(true);
+ PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " +
+ "IF NOT EXISTS %s", sequenceName));
+ createSequenceStatement.execute();
+ }
+
+ try (Connection tenantConn = DriverManager.getConnection(getUrl())) {
+ tenantConn.setAutoCommit(true);
+ Statement executeUpdateStatement = tenantConn.createStatement();
+ try {
+ executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SERVICE, SEQUENCE_NUMBER) VALUES " +
+ "( 'PHOENIX', NEXT VALUE FOR %s)", tableName, sequenceName));
+ Assert.fail();
+ } catch (SequenceNotFoundException e) {
+ assertTrue(true);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+ }
+
+ private void testTenantSequenceUpsertWithDifferentTenantConnection(String tableName) throws Exception {
+ String sequenceName = generateUniqueSequenceName();
+
+ try (Connection conn = getTenantConnection("PHOENIX")) {
+ conn.setAutoCommit(true);
+ PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " +
+ "IF NOT EXISTS %s", sequenceName));
+ createSequenceStatement.execute();
+ }
+
+ try (Connection tenantConn = getTenantConnection("HBASE")) {
+ tenantConn.setAutoCommit(true);
+
+ Statement executeUpdateStatement = tenantConn.createStatement();
+ try {
+ executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SEQUENCE_NUMBER) VALUES " +
+ "( NEXT VALUE FOR %s)", tableName, sequenceName));
+ Assert.fail();
+ } catch (SequenceNotFoundException e) {
+ assertTrue(true);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+ }
+
+ private void testTenantSequenceUpsertWithSameTenantConnection(String tableName) throws Exception {
+ String sequenceName = generateUniqueSequenceName();
+
+ try (Connection conn = getTenantConnection("ZOOKEEPER")) {
+ conn.setAutoCommit(true);
+ PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " +
+ "IF NOT EXISTS %s", sequenceName));
+ createSequenceStatement.execute();
+ Statement executeUpdateStatement = conn.createStatement();
+ executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SEQUENCE_NUMBER) VALUES " +
+ "( NEXT VALUE FOR %s)", tableName, sequenceName));
+ ResultSet rs = executeUpdateStatement.executeQuery("select * from " + tableName);
+ assertTrue(rs.next());
+ assertEquals("1", rs.getString(1));
+ assertFalse(rs.next());
+ }
+
+ }
+
+ private void testGlobalSequenceUpsertWithGlobalConnection(String tableName) throws Exception {
+ String sequenceName = generateUniqueSequenceName();
+
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(true);
+ PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " +
+ "IF NOT EXISTS %s", sequenceName));
+ createSequenceStatement.execute();
+ Statement executeUpdateStatement = conn.createStatement();
+ executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SERVICE, SEQUENCE_NUMBER) VALUES " +
+ "( 'PHOENIX', NEXT VALUE FOR %s)", tableName, sequenceName));
+ ResultSet rs = executeUpdateStatement.executeQuery("select * from " + tableName);
+ assertTrue(rs.next());
+ assertEquals("HBASE", rs.getString(1));
+ assertEquals("1", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("PHOENIX", rs.getString(1));
+ assertEquals("1", rs.getString(2));
+ assertFalse(rs.next());
+ }
+ }
+
+ private void testGlobalSequenceUpsertWithTenantConnection(String tableName) throws Exception {
+ String sequenceName = generateUniqueSequenceName();
+
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(true);
+ PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " +
+ "IF NOT EXISTS %s", sequenceName));
+ createSequenceStatement.execute();
+ }
+
+ try (Connection tenantConn = getTenantConnection("HBASE")) {
+ tenantConn.setAutoCommit(true);
+
+ Statement executeUpdateStatement = tenantConn.createStatement();
+ executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SEQUENCE_NUMBER) VALUES " +
+ "( NEXT VALUE FOR %s)", tableName, sequenceName));
+
+ ResultSet rs = executeUpdateStatement.executeQuery("select * from " + tableName);
+ assertTrue(rs.next());
+ assertEquals("1", rs.getString(1));
+ assertFalse(rs.next());
+
+ }
+ }
+
+ private static Connection getTenantConnection(String tenantId) throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ return DriverManager.getConnection(getUrl(), props);
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index a2699b7..e0c252e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -4963,7 +4963,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
for (SequenceAllocation sequenceAllocation : sequenceAllocations) {
SequenceKey key = sequenceAllocation.getSequenceKey();
Sequence newSequences = new Sequence(key);
- Sequence sequence = sequenceMap.putIfAbsent(key, newSequences);
+ Sequence sequence = getSequence(sequenceAllocation);
if (sequence == null) {
sequence = newSequences;
}
@@ -5036,6 +5036,44 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
+ /**
+ * checks if sequenceAllocation's sequence there in sequenceMap, also returns Global Sequences
+ * from Tenant sequenceAllocations
+ * @param sequenceAllocation
+ * @return
+ */
+
+ private Sequence getSequence(SequenceAllocation sequenceAllocation) {
+ SequenceKey key = sequenceAllocation.getSequenceKey();
+ if (key.getTenantId() == null) {
+ return sequenceMap.putIfAbsent(key, new Sequence(key));
+ } else {
+ Sequence sequence = sequenceMap.get(key);
+ if (sequence == null) {
+ for (Map.Entry<SequenceKey,Sequence> entry : sequenceMap.entrySet()) {
+ if (compareSequenceKeysWithoutTenant(key, entry.getKey())) {
+ return entry.getValue();
+ }
+ }
+ } else {
+ return sequence;
+ }
+ }
+ return null;
+ }
+
+ private boolean compareSequenceKeysWithoutTenant(SequenceKey keyToCompare, SequenceKey availableKey) {
+ if (availableKey.getTenantId() != null) {
+ return false;
+ }
+ boolean sameSchema = keyToCompare.getSchemaName() == null ? availableKey.getSchemaName() == null :
+ keyToCompare.getSchemaName().equals(availableKey.getSchemaName());
+ if (!sameSchema) {
+ return false;
+ }
+ return keyToCompare.getSequenceName().equals(availableKey.getSequenceName());
+ }
+
@Override
public void clearTableFromCache(final byte[] tenantId, final byte[] schemaName, final byte[] tableName,
final long clientTS) throws SQLException {