You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2021/09/14 19:41:46 UTC

[phoenix] branch 4.x updated: PHOENIX-6541 Use ROW_TIMESTAMP column value as timestamps for conditional upsert mutations (#1300)

This is an automated email from the ASF dual-hosted git repository.

gjacoby pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new df6a9d1  PHOENIX-6541 Use ROW_TIMESTAMP column value as timestamps for conditional upsert mutations (#1300)
df6a9d1 is described below

commit df6a9d1f27708df56dc86c357d1a29c30661d431
Author: tkhurana <kh...@gmail.com>
AuthorDate: Tue Sep 14 12:41:38 2021 -0700

    PHOENIX-6541 Use ROW_TIMESTAMP column value as timestamps for conditional upsert mutations (#1300)
---
 .../apache/phoenix/end2end/OnDuplicateKeyIT.java   | 114 ++++++++++++++++++++-
 .../phoenix/hbase/index/IndexRegionObserver.java   |   4 +-
 2 files changed, 115 insertions(+), 3 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
index 0878c07..532a28f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
 import java.sql.Connection;
 import java.sql.Date;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Collection;
@@ -37,10 +38,16 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -745,6 +752,74 @@ public class OnDuplicateKeyIT extends ParallelStatsDisabledIT {
         }
     }
 
+    @Test
+    public void testRowStampCol() throws Exception {
+        // ROW_TIMESTAMP is not supported for tables with indexes
+        if (indexDDL.length() > 0) {
+            return;
+        }
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String ddl = "create table " + tableName +
+                "(\n" +
+                "ORGANIZATION_ID CHAR(15) NOT NULL,\n" +
+                "USER_ID CHAR(15) NOT NULL,\n" +
+                "TIME_STAMP DATE NOT NULL,\n" +
+                "STATUS VARCHAR,\n" +
+                "CONSTRAINT PK PRIMARY KEY \n" +
+                "    (\n" +
+                "        ORGANIZATION_ID, \n" +
+                "        USER_ID,\n" +
+                "        TIME_STAMP ROW_TIMESTAMP\n" + // ROW_TIMESTAMP col
+                "    ) \n" +
+                ")\n";
+
+            conn.createStatement().execute(ddl);
+            String orgid = "ORG1";
+            String userid = "USER1";
+            String original = "ORIGINAL";
+            String updated = "UPDATED";
+            String duplicate = "DUPLICATE";
+            long rowTimestamp = EnvironmentEdgeManager.currentTimeMillis() - 10;
+            String dml = "UPSERT INTO  " + tableName +
+                "(ORGANIZATION_ID, USER_ID, TIME_STAMP, STATUS) VALUES (?, ?, ?, ?)";
+            String ignoreDml = dml + "ON DUPLICATE KEY IGNORE";
+            String updateDml = dml + "ON DUPLICATE KEY UPDATE status='" + duplicate + "'";
+            String nullDml = dml + "ON DUPLICATE KEY UPDATE status = null";
+            String dql = "SELECT count(*) from " + tableName + " WHERE STATUS = ?";
+
+            // row doesn't exist
+            upsertRecord(conn, ignoreDml, orgid, userid, rowTimestamp, original);
+            assertNumRecords(1, conn, dql, original);
+            assertHBaseRowTimestamp(tableName, rowTimestamp);
+
+            // on duplicate key ignore
+            upsertRecord(conn, ignoreDml, orgid, userid, rowTimestamp, updated);
+            assertNumRecords(1, conn, dql, original);
+            assertNumRecords(0, conn, dql, updated);
+            assertHBaseRowTimestamp(tableName, rowTimestamp);
+
+            // regular upsert override
+            upsertRecord(conn, dml, orgid, userid, rowTimestamp, updated);
+            assertNumRecords(0, conn, dql, original);
+            assertNumRecords(1, conn, dql, updated);
+            assertHBaseRowTimestamp(tableName, rowTimestamp);
+
+            // on duplicate key update generates extra mutations on the server but those mutations
+            // don't honor ROW_TIMESTAMP
+            upsertRecord(conn, updateDml, orgid, userid, rowTimestamp, "");
+            assertNumRecords(0, conn, dql, updated);
+            assertNumRecords(1, conn, dql, duplicate);
+
+            // set null, new mutations generated on the server
+            upsertRecord(conn, nullDml, orgid, userid, rowTimestamp, "");
+            assertNumRecords(0, conn, dql, duplicate);
+            dql = "SELECT count(*) from " + tableName + " WHERE STATUS is null";
+            assertNumRecords(1, conn, dql);
+        }
+    }
+
     private void assertRow(Connection conn, String tableName, String expectedPK, int expectedCol1, String expectedCol2) throws SQLException {
         ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
         assertTrue(rs.next());
@@ -754,6 +829,41 @@ public class OnDuplicateKeyIT extends ParallelStatsDisabledIT {
         assertFalse(rs.next());
     }
 
+    private void upsertRecord(Connection conn, String dml, String orgid, String userid, long ts, String status) throws SQLException {
+        try(PreparedStatement stmt = conn.prepareStatement(dml)) { // regular upsert
+            stmt.setString(1, orgid);
+            stmt.setString(2, userid);
+            stmt.setDate(3, new Date(ts));
+            stmt.setString(4, status); // status should change now
+            stmt.executeUpdate();
+            conn.commit();
+        }
+    }
 
+    private void assertNumRecords(int count, Connection conn, String dql, String... params)
+        throws Exception {
+        PreparedStatement stmt = conn.prepareStatement(dql);
+        int counter = 1;
+        for (String param : params) {
+            stmt.setString(counter++, param);
+        }
+        ResultSet rs = stmt.executeQuery();
+        assertTrue(rs.next());
+        assertEquals(count, rs.getInt(1));
+    }
+
+    private void assertHBaseRowTimestamp(String tableName, long expectedTimestamp) throws Exception {
+        Scan scan = new Scan();
+        byte[] emptyKVQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(true).getFirst();
+        try (org.apache.hadoop.hbase.client.Connection hconn =
+                 ConnectionFactory.createConnection(config)) {
+            Table table = hconn.getTable(TableName.valueOf(tableName));
+            ResultScanner resultScanner = table.getScanner(scan);
+            Result result = resultScanner.next();
+            long actualTimestamp = result.getColumnLatestCell(
+                QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, emptyKVQualifier).getTimestamp();
+            assertEquals(expectedTimestamp, actualTimestamp);
+        }
+    }
 }
     
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 96b4f18..8c40def 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -610,7 +610,9 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
             }
             Mutation m = miniBatchOp.getOperation(i);
             // skip this mutation if we aren't enabling indexing or not an atomic op
-            if (!builder.isEnabled(m) && !builder.isAtomicOp(m)) {
+            // or if it is an atomic op and its timestamp is already set(not LATEST)
+            if (!builder.isEnabled(m) &&
+                !(builder.isAtomicOp(m) && getMaxTimestamp(m) == HConstants.LATEST_TIMESTAMP)) {
                 continue;
             }
             setTimestampOnMutation(m, ts);