You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2021/09/14 19:41:46 UTC
[phoenix] branch 4.x updated: PHOENIX-6541 Use ROW_TIMESTAMP column
value as timestamps for conditional upsert mutations (#1300)
This is an automated email from the ASF dual-hosted git repository.
gjacoby pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new df6a9d1 PHOENIX-6541 Use ROW_TIMESTAMP column value as timestamps for conditional upsert mutations (#1300)
df6a9d1 is described below
commit df6a9d1f27708df56dc86c357d1a29c30661d431
Author: tkhurana <kh...@gmail.com>
AuthorDate: Tue Sep 14 12:41:38 2021 -0700
PHOENIX-6541 Use ROW_TIMESTAMP column value as timestamps for conditional upsert mutations (#1300)
---
.../apache/phoenix/end2end/OnDuplicateKeyIT.java | 114 ++++++++++++++++++++-
.../phoenix/hbase/index/IndexRegionObserver.java | 4 +-
2 files changed, 115 insertions(+), 3 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
index 0878c07..532a28f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
@@ -37,10 +38,16 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -745,6 +752,74 @@ public class OnDuplicateKeyIT extends ParallelStatsDisabledIT {
}
}
+ @Test
+ public void testRowStampCol() throws Exception {
+ // ROW_TIMESTAMP is not supported for tables with indexes
+ if (indexDDL.length() > 0) {
+ return;
+ }
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String ddl = "create table " + tableName +
+ "(\n" +
+ "ORGANIZATION_ID CHAR(15) NOT NULL,\n" +
+ "USER_ID CHAR(15) NOT NULL,\n" +
+ "TIME_STAMP DATE NOT NULL,\n" +
+ "STATUS VARCHAR,\n" +
+ "CONSTRAINT PK PRIMARY KEY \n" +
+ " (\n" +
+ " ORGANIZATION_ID, \n" +
+ " USER_ID,\n" +
+ " TIME_STAMP ROW_TIMESTAMP\n" + // ROW_TIMESTAMP col
+ " ) \n" +
+ ")\n";
+
+ conn.createStatement().execute(ddl);
+ String orgid = "ORG1";
+ String userid = "USER1";
+ String original = "ORIGINAL";
+ String updated = "UPDATED";
+ String duplicate = "DUPLICATE";
+ long rowTimestamp = EnvironmentEdgeManager.currentTimeMillis() - 10;
+ String dml = "UPSERT INTO " + tableName +
+ "(ORGANIZATION_ID, USER_ID, TIME_STAMP, STATUS) VALUES (?, ?, ?, ?)";
+ String ignoreDml = dml + "ON DUPLICATE KEY IGNORE";
+ String updateDml = dml + "ON DUPLICATE KEY UPDATE status='" + duplicate + "'";
+ String nullDml = dml + "ON DUPLICATE KEY UPDATE status = null";
+ String dql = "SELECT count(*) from " + tableName + " WHERE STATUS = ?";
+
+ // row doesn't exist
+ upsertRecord(conn, ignoreDml, orgid, userid, rowTimestamp, original);
+ assertNumRecords(1, conn, dql, original);
+ assertHBaseRowTimestamp(tableName, rowTimestamp);
+
+ // on duplicate key ignore
+ upsertRecord(conn, ignoreDml, orgid, userid, rowTimestamp, updated);
+ assertNumRecords(1, conn, dql, original);
+ assertNumRecords(0, conn, dql, updated);
+ assertHBaseRowTimestamp(tableName, rowTimestamp);
+
+ // regular upsert override
+ upsertRecord(conn, dml, orgid, userid, rowTimestamp, updated);
+ assertNumRecords(0, conn, dql, original);
+ assertNumRecords(1, conn, dql, updated);
+ assertHBaseRowTimestamp(tableName, rowTimestamp);
+
+ // on duplicate key update generates extra mutations on the server but those mutations
+ // don't honor ROW_TIMESTAMP
+ upsertRecord(conn, updateDml, orgid, userid, rowTimestamp, "");
+ assertNumRecords(0, conn, dql, updated);
+ assertNumRecords(1, conn, dql, duplicate);
+
+ // set null, new mutations generated on the server
+ upsertRecord(conn, nullDml, orgid, userid, rowTimestamp, "");
+ assertNumRecords(0, conn, dql, duplicate);
+ dql = "SELECT count(*) from " + tableName + " WHERE STATUS is null";
+ assertNumRecords(1, conn, dql);
+ }
+ }
+
private void assertRow(Connection conn, String tableName, String expectedPK, int expectedCol1, String expectedCol2) throws SQLException {
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
assertTrue(rs.next());
@@ -754,6 +829,41 @@ public class OnDuplicateKeyIT extends ParallelStatsDisabledIT {
assertFalse(rs.next());
}
+ private void upsertRecord(Connection conn, String dml, String orgid, String userid, long ts, String status) throws SQLException {
+ try(PreparedStatement stmt = conn.prepareStatement(dml)) { // regular upsert
+ stmt.setString(1, orgid);
+ stmt.setString(2, userid);
+ stmt.setDate(3, new Date(ts));
+ stmt.setString(4, status); // status should change now
+ stmt.executeUpdate();
+ conn.commit();
+ }
+ }
+ private void assertNumRecords(int count, Connection conn, String dql, String... params)
+ throws Exception {
+ PreparedStatement stmt = conn.prepareStatement(dql);
+ int counter = 1;
+ for (String param : params) {
+ stmt.setString(counter++, param);
+ }
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(count, rs.getInt(1));
+ }
+
+ private void assertHBaseRowTimestamp(String tableName, long expectedTimestamp) throws Exception {
+ Scan scan = new Scan();
+ byte[] emptyKVQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(true).getFirst();
+ try (org.apache.hadoop.hbase.client.Connection hconn =
+ ConnectionFactory.createConnection(config)) {
+ Table table = hconn.getTable(TableName.valueOf(tableName));
+ ResultScanner resultScanner = table.getScanner(scan);
+ Result result = resultScanner.next();
+ long actualTimestamp = result.getColumnLatestCell(
+ QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, emptyKVQualifier).getTimestamp();
+ assertEquals(expectedTimestamp, actualTimestamp);
+ }
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 96b4f18..8c40def 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -610,7 +610,9 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
}
Mutation m = miniBatchOp.getOperation(i);
// skip this mutation if we aren't enabling indexing or not an atomic op
- if (!builder.isEnabled(m) && !builder.isAtomicOp(m)) {
+ // or if it is an atomic op and its timestamp is already set(not LATEST)
+ if (!builder.isEnabled(m) &&
+ !(builder.isAtomicOp(m) && getMaxTimestamp(m) == HConstants.LATEST_TIMESTAMP)) {
continue;
}
setTimestampOnMutation(m, ts);