You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vj...@apache.org on 2021/07/29 15:50:44 UTC

[phoenix] branch 4.x updated: PHOENIX-6506 : Tenant Connection is not able to access/validate Global Sequences (#1270) (#1261)

This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 2bdd4a2  PHOENIX-6506 : Tenant Connection is not able to access/validate Global Sequences (#1270) (#1261)
2bdd4a2 is described below

commit 2bdd4a2bb4c58e91a1293bd4afc09f6f33ee8542
Author: Lokesh Khurana <kh...@gmail.com>
AuthorDate: Thu Jul 29 21:20:33 2021 +0530

    PHOENIX-6506 : Tenant Connection is not able to access/validate Global Sequences (#1270) (#1261)
    
    Signed-off-by: Viraj Jasani <vj...@apache.org>
---
 .../org/apache/phoenix/end2end/UpsertValuesIT.java | 148 ++++++++++++++++++++-
 .../phoenix/query/ConnectionQueryServicesImpl.java |  40 +++++-
 2 files changed, 186 insertions(+), 2 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
index e89cc58..32662ec 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
@@ -44,11 +44,15 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.SequenceNotFoundException;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.Assert;
 import org.junit.Test;
 
 
@@ -670,5 +674,147 @@ public class UpsertValuesIT extends ParallelStatsDisabledIT {
             assertTrue(next.containsColumn(Bytes.toBytes("CF2"), PInteger.INSTANCE.toBytes(3)));
         }
     }
-    
+
+
+    @Test
+    public void testUpsertValueWithDiffSequenceAndConnections() throws Exception {
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(true);
+            PreparedStatement createTableStatement = conn.prepareStatement(String.format("CREATE TABLE IF NOT EXISTS " +
+                    "%s (SERVICE VARCHAR NOT NULL, SEQUENCE_NUMBER BIGINT NOT NULL , " +
+                    "CONSTRAINT PK PRIMARY KEY (SERVICE, SEQUENCE_NUMBER)) MULTI_TENANT = TRUE", tableName));
+            createTableStatement.execute();
+        }
+
+        testGlobalSequenceUpsertWithTenantConnection(tableName);
+        testGlobalSequenceUpsertWithGlobalConnection(tableName);
+        testTenantSequenceUpsertWithSameTenantConnection(tableName);
+        testTenantSequenceUpsertWithDifferentTenantConnection(tableName);
+        testTenantSequenceUpsertWithGlobalConnection(tableName);
+
+    }
+
+    private void testTenantSequenceUpsertWithGlobalConnection(String tableName) throws Exception {
+        String sequenceName = generateUniqueSequenceName();
+
+        try (Connection conn = getTenantConnection("PHOENIX")) {
+            conn.setAutoCommit(true);
+            PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " +
+                    "IF NOT EXISTS %s", sequenceName));
+            createSequenceStatement.execute();
+        }
+
+        try (Connection tenantConn = DriverManager.getConnection(getUrl())) {
+            tenantConn.setAutoCommit(true);
+            Statement executeUpdateStatement = tenantConn.createStatement();
+            try {
+                executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SERVICE, SEQUENCE_NUMBER) VALUES " +
+                        "( 'PHOENIX', NEXT VALUE FOR %s)", tableName, sequenceName));
+                Assert.fail();
+            } catch (SequenceNotFoundException e) {
+                assertTrue(true);
+            } catch (Exception e) {
+                Assert.fail();
+            }
+        }
+    }
+
+    private void testTenantSequenceUpsertWithDifferentTenantConnection(String tableName) throws Exception {
+        String sequenceName = generateUniqueSequenceName();
+
+        try (Connection conn = getTenantConnection("PHOENIX")) {
+            conn.setAutoCommit(true);
+            PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " +
+                    "IF NOT EXISTS %s", sequenceName));
+            createSequenceStatement.execute();
+        }
+
+        try (Connection tenantConn = getTenantConnection("HBASE")) {
+            tenantConn.setAutoCommit(true);
+
+            Statement executeUpdateStatement = tenantConn.createStatement();
+            try {
+                executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SEQUENCE_NUMBER) VALUES " +
+                        "( NEXT VALUE FOR %s)", tableName, sequenceName));
+                Assert.fail();
+            } catch (SequenceNotFoundException e) {
+                assertTrue(true);
+            } catch (Exception e) {
+                Assert.fail();
+            }
+        }
+    }
+
+    private void testTenantSequenceUpsertWithSameTenantConnection(String tableName) throws Exception {
+        String sequenceName = generateUniqueSequenceName();
+
+        try (Connection conn = getTenantConnection("ZOOKEEPER")) {
+            conn.setAutoCommit(true);
+            PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " +
+                    "IF NOT EXISTS %s", sequenceName));
+            createSequenceStatement.execute();
+            Statement executeUpdateStatement = conn.createStatement();
+            executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SEQUENCE_NUMBER) VALUES " +
+                    "( NEXT VALUE FOR %s)", tableName, sequenceName));
+            ResultSet rs = executeUpdateStatement.executeQuery("select * from " + tableName);
+            assertTrue(rs.next());
+            assertEquals("1", rs.getString(1));
+            assertFalse(rs.next());
+        }
+
+    }
+
+    private void testGlobalSequenceUpsertWithGlobalConnection(String tableName) throws Exception {
+        String sequenceName = generateUniqueSequenceName();
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(true);
+            PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " +
+                    "IF NOT EXISTS %s", sequenceName));
+            createSequenceStatement.execute();
+            Statement executeUpdateStatement = conn.createStatement();
+            executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SERVICE, SEQUENCE_NUMBER) VALUES " +
+                    "( 'PHOENIX', NEXT VALUE FOR %s)", tableName, sequenceName));
+            ResultSet rs = executeUpdateStatement.executeQuery("select * from " + tableName);
+            assertTrue(rs.next());
+            assertEquals("HBASE", rs.getString(1));
+            assertEquals("1", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("PHOENIX", rs.getString(1));
+            assertEquals("1", rs.getString(2));
+            assertFalse(rs.next());
+        }
+    }
+
+    private void testGlobalSequenceUpsertWithTenantConnection(String tableName) throws Exception {
+        String sequenceName = generateUniqueSequenceName();
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(true);
+            PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " +
+                    "IF NOT EXISTS %s", sequenceName));
+            createSequenceStatement.execute();
+        }
+
+        try (Connection tenantConn = getTenantConnection("HBASE")) {
+            tenantConn.setAutoCommit(true);
+
+            Statement executeUpdateStatement = tenantConn.createStatement();
+            executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SEQUENCE_NUMBER) VALUES " +
+                    "( NEXT VALUE FOR %s)", tableName, sequenceName));
+
+            ResultSet rs = executeUpdateStatement.executeQuery("select * from " + tableName);
+            assertTrue(rs.next());
+            assertEquals("1", rs.getString(1));
+            assertFalse(rs.next());
+
+        }
+    }
+
+    private static Connection getTenantConnection(String tenantId) throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        return DriverManager.getConnection(getUrl(), props);
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index a2699b7..e0c252e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -4963,7 +4963,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         for (SequenceAllocation sequenceAllocation : sequenceAllocations) {
             SequenceKey key = sequenceAllocation.getSequenceKey();
             Sequence newSequences = new Sequence(key);
-            Sequence sequence = sequenceMap.putIfAbsent(key, newSequences);
+            Sequence sequence = getSequence(sequenceAllocation);
             if (sequence == null) {
                 sequence = newSequences;
             }
@@ -5036,6 +5036,44 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         }
     }
 
+    /**
+     * checks if sequenceAllocation's sequence there in sequenceMap, also returns Global Sequences
+     * from Tenant sequenceAllocations
+     * @param sequenceAllocation
+     * @return
+     */
+
+    private Sequence getSequence(SequenceAllocation sequenceAllocation) {
+        SequenceKey key = sequenceAllocation.getSequenceKey();
+        if (key.getTenantId() == null) {
+            return sequenceMap.putIfAbsent(key, new Sequence(key));
+        } else {
+            Sequence sequence = sequenceMap.get(key);
+            if (sequence == null) {
+                for (Map.Entry<SequenceKey,Sequence> entry : sequenceMap.entrySet()) {
+                    if (compareSequenceKeysWithoutTenant(key, entry.getKey())) {
+                        return entry.getValue();
+                    }
+                }
+            } else {
+                return sequence;
+            }
+        }
+        return null;
+    }
+
+    private boolean compareSequenceKeysWithoutTenant(SequenceKey keyToCompare, SequenceKey availableKey) {
+        if (availableKey.getTenantId() != null) {
+            return false;
+        }
+        boolean sameSchema = keyToCompare.getSchemaName() == null ? availableKey.getSchemaName() == null :
+                keyToCompare.getSchemaName().equals(availableKey.getSchemaName());
+        if (!sameSchema) {
+            return false;
+        }
+        return keyToCompare.getSequenceName().equals(availableKey.getSequenceName());
+    }
+
     @Override
     public void clearTableFromCache(final byte[] tenantId, final byte[] schemaName, final byte[] tableName,
             final long clientTS) throws SQLException {