You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2021/06/16 22:57:51 UTC
[phoenix] branch 4.x updated: PHOENIX-6387: Conditional updates on
tables with indexes
This is an automated email from the ASF dual-hosted git repository.
skadam pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new 68ef271 PHOENIX-6387: Conditional updates on tables with indexes
68ef271 is described below
commit 68ef27155d0e3453efe4325c8d08199b1ebaef5c
Author: tkhurana <kh...@gmail.com>
AuthorDate: Wed Jun 16 15:57:41 2021 -0700
PHOENIX-6387: Conditional updates on tables with indexes
* PHOENIX-6387 Conditional updates on tables with indexes (#1215)
* PHOENIX-6387 Conditional updates on tables with indexes client side
* PHOENIX-6387 Conditional updates on tables with indexes server side
* Remove the extra read For regular upserts on tables with local index
* Addressed review comments
* PHOENIX-6474 Client and server metrics for atomic upserts (#1237)
* PHOENIX-6474 Client and server metrics for atomic upserts
* Fixed failing tests related to metrics
---
.../apache/phoenix/end2end/OnDuplicateKeyIT.java | 111 +++-
.../apache/phoenix/end2end/WALAnnotationIT.java | 34 ++
.../end2end/index/GlobalIndexCheckerIT.java | 28 +
.../phoenix/monitoring/BasePhoenixMetricsIT.java | 2 +-
.../phoenix/monitoring/PhoenixMetricsIT.java | 2 +-
.../monitoring/PhoenixTableLevelMetricsIT.java | 46 ++
.../org/apache/phoenix/compile/UpsertCompiler.java | 6 -
.../coprocessor/GlobalIndexRegionScanner.java | 18 +-
.../org/apache/phoenix/execute/MutationState.java | 25 +-
.../phoenix/hbase/index/IndexRegionObserver.java | 624 ++++++++++++++++++---
.../org/apache/phoenix/jdbc/PhoenixStatement.java | 10 +
.../org/apache/phoenix/monitoring/MetricType.java | 6 +
.../phoenix/monitoring/MutationMetricQueue.java | 13 +-
.../phoenix/monitoring/TableClientMetrics.java | 8 +-
.../java/org/apache/phoenix/schema/PTableImpl.java | 2 +-
.../apache/phoenix/compile/QueryCompilerTest.java | 15 -
16 files changed, 817 insertions(+), 133 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
index fde31e4..0878c07 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java
@@ -36,7 +36,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import org.apache.hadoop.hbase.TableName;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -64,6 +68,12 @@ public class OnDuplicateKeyIT extends ParallelStatsDisabledIT {
testCases.add(new String[] {
"create local index %s_IDX on %s(counter1, counter2)",
});
+ testCases.add(new String[] {
+ "create index %s_IDX on %s(counter1) include (counter2)",
+ });
+ testCases.add(new String[] {
+ "create index %s_IDX on %s(counter1, counter2)",
+ });
return testCases;
}
@@ -498,18 +508,29 @@ public class OnDuplicateKeyIT extends ParallelStatsDisabledIT {
exec.shutdownNow();
int finalResult = nThreads * nCommits * nIncrementsPerCommit;
- //assertEquals(finalResult,resultHolder[0]);
- ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0");
+ boolean isIndexCreated = this.indexDDL != null && this.indexDDL.length() > 0;
+
+ ResultSet rs;
+ String selectSql = "SELECT * FROM " + tableName + " WHERE counter1 >= 0";
+ if (isIndexCreated) {
+ rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+ String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+ IndexToolIT.assertExplainPlan(this.indexDDL.contains("local"), actualExplainPlan,
+ tableName, tableName + "_IDX");
+ }
+ rs = conn.createStatement().executeQuery(selectSql);
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals(finalResult,rs.getInt(2));
assertFalse(rs.next());
- rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ * FROM " + tableName + " WHERE counter1 >= 0");
- assertTrue(rs.next());
- assertEquals("a",rs.getString(1));
- assertEquals(finalResult,rs.getInt(2));
- assertFalse(rs.next());
+ if (isIndexCreated) {
+ rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ * FROM " + tableName + " WHERE counter1 >= 0");
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(1));
+ assertEquals(finalResult, rs.getInt(2));
+ assertFalse(rs.next());
+ }
conn.close();
}
@@ -648,6 +669,82 @@ public class OnDuplicateKeyIT extends ParallelStatsDisabledIT {
}
}
+ @Test
+ public void testMultiplePartialUpdatesInSameBatch() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String ddl = "create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 bigint)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+ String dml;
+ ResultSet rs;
+ // first commit
+ dml = String.format("UPSERT INTO %s VALUES('a',0,0)", tableName);
+ conn.createStatement().execute(dml);
+ conn.commit();
+ // batch multiple conditional updates (partial) in a single batch
+ dml = String.format(
+ "UPSERT INTO %s VALUES('a',2,3) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1", tableName);
+ conn.createStatement().execute(dml);
+ dml = String.format(
+ "UPSERT INTO %s VALUES('a',2,3) ON DUPLICATE KEY UPDATE counter2 = counter2 + 2", tableName);
+ conn.createStatement().execute(dml);
+ dml = String.format(
+ "UPSERT INTO %s VALUES('a',2,3) ON DUPLICATE KEY UPDATE counter1 = counter1 + 100", tableName);
+ conn.createStatement().execute(dml);
+ dml = String.format(
+ "UPSERT INTO %s VALUES('a',2,3) ON DUPLICATE KEY UPDATE counter2 = counter2 + 200", tableName);
+ conn.createStatement().execute(dml);
+ conn.commit();
+ String dql = String.format("SELECT counter1, counter2 FROM %s WHERE counter1 > 0", tableName);
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals(101, rs.getInt(1));
+ assertEquals(202, rs.getInt(2));
+ }
+ }
+
+ @Test
+ public void testComplexDuplicateKeyExpression() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String ddl = "create table " + tableName +
+ "(pk varchar primary key, counter1 bigint, counter2 bigint, approval varchar)";
+ conn.createStatement().execute(ddl);
+ createIndex(conn, tableName);
+ String dml;
+ dml = String.format("UPSERT INTO %s VALUES('abc', 0, 100, 'NONE')", tableName);
+ conn.createStatement().execute(dml);
+ conn.commit();
+ dml = String.format("UPSERT INTO %s(pk, counter1, counter2) VALUES ('abc', 0, 10) " +
+ "ON DUPLICATE KEY UPDATE " +
+ "counter1 = counter1 + counter2," +
+ "approval = CASE WHEN counter1 < 100 THEN 'NONE' " +
+ "WHEN counter1 < 1000 THEN 'MANAGER_APPROVAL' " +
+ "ELSE 'VP_APPROVAL' END", tableName);
+ conn.createStatement().execute(dml);
+ conn.commit();
+ String dql = "SELECT * from " + tableName;
+ ResultSet rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(100, rs.getInt("counter1"));
+ assertEquals(100, rs.getInt("counter2"));
+ assertEquals("NONE", rs.getString("approval"));
+
+ conn.createStatement().execute(dml);
+ conn.commit();
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals("abc", rs.getString("pk"));
+ assertEquals(200, rs.getInt("counter1"));
+ assertEquals(100, rs.getInt("counter2"));
+ assertEquals("MANAGER_APPROVAL", rs.getString("approval"));
+ }
+ }
+
private void assertRow(Connection conn, String tableName, String expectedPK, int expectedCol1, String expectedCol2) throws SQLException {
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
assertTrue(rs.next());
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java
index e4618c7..77cbde0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java
@@ -456,6 +456,40 @@ public class WALAnnotationIT extends BaseUniqueNamesOwnClusterIT {
tenantViewHelper(true);
}
+ @Test
+ public void testOnDuplicateUpsertWithIndex() throws Exception {
+ Assume.assumeFalse(this.isImmutable); // on duplicate is not supported for immutable tables
+ Assume.assumeTrue(HbaseCompatCapabilities.hasPreWALAppend());
+ SchemaBuilder builder = new SchemaBuilder(getUrl());
+ try (Connection conn = getConnection()) {
+ SchemaBuilder.TableOptions tableOptions = getTableOptions();
+ builder.withTableOptions(tableOptions).withTableIndexDefaults().build();
+ PTable table = PhoenixRuntime.getTableNoCache(conn, builder.getEntityTableName());
+ assertEquals("Change Detection Enabled is false!", true, table.isChangeDetectionEnabled());
+ Long ddlTimestamp = table.getLastDDLTimestamp();
+ String upsertSql = "UPSERT INTO " + builder.getEntityTableName() + " VALUES" +
+ " ('a', 'b', 'c', 'd')";
+ conn.createStatement().execute(upsertSql);
+ conn.commit();
+ List<String> columns = builder.getTableOptions().getTableColumns();
+ assertTrue(columns.size() >= 2);
+ String col1 = columns.get(0);
+ String col2 = columns.get(1);
+ // col1 = col1 || col1, col2 = null
+ String onDupClause = String.format("%s = %s || %s, %s = null", col1, col1, col1, col2);
+ // this will result in one Put and one Delete (because of null) mutation
+ upsertSql = "UPSERT INTO " + builder.getEntityTableName() + " VALUES" +
+ " ('a', 'b', 'c', 'd') ON DUPLICATE KEY UPDATE " + onDupClause;
+ conn.createStatement().execute(upsertSql);
+ conn.commit();
+ assertAnnotation(2, builder.getPhysicalTableName(false), null,
+ builder.getTableOptions().getSchemaName(),
+ builder.getDataOptions().getTableName(), PTableType.TABLE, ddlTimestamp);
+ assertAnnotation(0, builder.getPhysicalTableIndexName(false),
+ null, null, null, null, ddlTimestamp);
+ }
+ }
+
private List<Map<String, byte[]>> getEntriesForTable(TableName tableName) throws IOException {
AnnotatedWALObserver c = getTestCoprocessor(tableName);
List<Map<String, byte[]>> entries = c.getWalAnnotationsByTable(tableName);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 47eea6d..7668d55 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -1022,6 +1022,34 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
}
}
+ @Test
+ public void testOnDuplicateKeyWithIndex() throws Exception {
+ if (async || encoded) { // run only once with single cell encoding enabled
+ return;
+ }
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ String indexTableName = generateUniqueName();
+ populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
+ conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+ dataTableName + " (val1) include (val2, val3)" + this.indexDDLOptions);
+ conn.commit();
+ String upsertSql = "UPSERT INTO " + dataTableName + " VALUES ('a') ON DUPLICATE KEY UPDATE " +
+ "val1 = val1 || val1, val2 = val2 || val2";
+ conn.createStatement().execute(upsertSql);
+ conn.commit();
+ String selectSql = "SELECT * from " + dataTableName + " WHERE val1 = 'abab'";
+ assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+ ResultSet rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(1));
+ assertEquals("abab", rs.getString(2));
+ assertEquals("abcabc", rs.getString(3));
+ assertEquals("abcd", rs.getString(4));
+ assertFalse(rs.next());
+ }
+ }
+
static private void commitWithException(Connection conn) {
try {
conn.commit();
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
index 7c58945..45b3561 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
@@ -113,7 +113,7 @@ public class BasePhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
String t = entry.getKey();
assertEquals("Table names didn't match!", tableName, t);
Map<MetricType, Long> p = entry.getValue();
- assertEquals("There should have been fifteen metrics", 15, p.size());
+ assertEquals("There should have been sixteen metrics", 16, p.size());
boolean mutationBatchSizePresent = false;
boolean mutationCommitTimePresent = false;
boolean mutationBytesPresent = false;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index dc27dee..32d2f67 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -487,7 +487,7 @@ public class PhoenixMetricsIT extends BasePhoenixMetricsIT {
String t = entry.getKey();
assertEquals("Table names didn't match!", tableName, t);
Map<MetricType, Long> p = entry.getValue();
- assertEquals("There should have been five metrics", 15, p.size());
+ assertEquals("There should have been sixteen metrics", 16, p.size());
boolean mutationBatchSizePresent = false;
boolean mutationCommitTimePresent = false;
boolean mutationBytesPresent = false;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
index b1215cd..2c1aaf9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
@@ -56,6 +56,8 @@ import org.junit.experimental.categories.Category;
import static org.apache.phoenix.exception.SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY;
import static org.apache.phoenix.exception.SQLExceptionCode.GET_TABLE_REGIONS_FAIL;
import static org.apache.phoenix.exception.SQLExceptionCode.OPERATION_TIMED_OUT;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_SUCCESS_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_COUNTER;
@@ -1148,6 +1150,50 @@ public class PhoenixTableLevelMetricsIT extends BaseUniqueNamesOwnClusterIT {
}
}
+ @Test public void testTableLevelMetricsForAtomicUpserts() throws Throwable {
+ String tableName = generateUniqueName();
+ Connection conn = null;
+ Throwable exception = null;
+ int numAtomicUpserts = 4;
+ try {
+ conn = getConnFromTestDriver();
+ String ddl = "create table " + tableName + "(pk varchar primary key, counter1 bigint)";
+ conn.createStatement().execute(ddl);
+ String dml;
+ ResultSet rs;
+ dml = String.format("UPSERT INTO %s VALUES('a', 0)", tableName);
+ conn.createStatement().execute(dml);
+ dml = String.format("UPSERT INTO %s VALUES('a', 0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1", tableName);
+ for (int i = 0; i < numAtomicUpserts; ++i) {
+ conn.createStatement().execute(dml);
+ }
+ conn.commit();
+ String dql = String.format("SELECT counter1 FROM %s WHERE counter1 > 0", tableName);
+ rs = conn.createStatement().executeQuery(dql);
+ assertTrue(rs.next());
+ assertEquals(4, rs.getInt(1));
+ }catch (Throwable t) {
+ exception = t;
+ } finally {
+ // Otherwise the test fails with an error from assertions below instead of the real exception
+ if (exception != null) {
+ throw exception;
+ }
+ assertNotNull("Failed to get a connection!", conn);
+ // Get write metrics before closing the connection since that clears those metrics
+ Map<MetricType, Long>
+ writeMutMetrics =
+ getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
+ conn.close();
+ // 1 regular upsert + numAtomicUpserts
+ // 2 mutations (regular and atomic on the same row in the same batch will be split)
+ assertMutationTableMetrics(true, tableName, 1 + numAtomicUpserts, 0, 0, true, 2, 0, 0, 2, 0,
+ writeMutMetrics, conn);
+ assertEquals(numAtomicUpserts, getMetricFromTableMetrics(tableName, ATOMIC_UPSERT_SQL_COUNTER));
+ assertTrue(getMetricFromTableMetrics(tableName, ATOMIC_UPSERT_COMMIT_TIME) > 0);
+ }
+ }
+
private Connection getConnFromTestDriver() throws SQLException {
Connection conn = DriverManager.getConnection(url);
assertTrue(conn.unwrap(PhoenixConnection.class)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index f1aa249..075c537 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -864,12 +864,6 @@ public class UpsertCompiler {
.setTableName(table.getTableName().getString())
.build().buildException();
}
- if (SchemaUtil.hasGlobalIndex(table)) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_WITH_GLOBAL_IDX)
- .setSchemaName(table.getSchemaName().getString())
- .setTableName(table.getTableName().getString())
- .build().buildException();
- }
if (onDupKeyPairs.isEmpty()) { // ON DUPLICATE KEY IGNORE
onDupKeyBytesToBe = PhoenixIndexBuilder.serializeOnDupKeyIgnore();
} else { // ON DUPLICATE KEY UPDATE;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index 6a10edc..fbff011 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -256,26 +256,30 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
}
@Override
public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) {
- List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
- if (cellList == null || cellList.isEmpty()) {
+ Cell cell = getLatestCell(ref, ts);
+ if (cell == null) {
return null;
}
- Cell cell = cellList.get(0);
valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
return valuePtr;
}
- @Override
- public KeyValue getLatestKeyValue(ColumnReference ref, long ts) {
+ public Cell getLatestCell(ColumnReference ref, long ts) {
List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
if (cellList == null || cellList.isEmpty()) {
return null;
}
- Cell cell = cellList.get(0);
- return new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+ return cellList.get(0);
+ }
+ @Override
+ public KeyValue getLatestKeyValue(ColumnReference ref, long ts) {
+ Cell cell = getLatestCell(ref, ts);
+ KeyValue kv = cell == null ? null :
+ new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ return kv;
}
@Override
public byte[] getRowKey() {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 1efbcf0..68c0d02 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -955,6 +955,7 @@ public class MutationState implements SQLCloseable {
long tempSize;
long deleteSize = 0, deleteCounter = 0;
long upsertsize = 0, upsertCounter = 0;
+ long atomicUpsertsize = 0;
if (GlobalClientMetrics.isMetricsEnabled()) {
for (Mutation mutation : mutations) {
tempSize = KeyValueUtil.calculateMutationDiskSize(mutation);
@@ -966,6 +967,9 @@ public class MutationState implements SQLCloseable {
}else if(mutation instanceof Put) {
upsertsize += tempSize;
upsertCounter++;
+ if (mutation.getAttribute(PhoenixIndexBuilder.ATOMIC_OP_ATTRIB) != null) {
+ atomicUpsertsize += tempSize;
+ }
allDeletesMutations = false;
} else {
allUpsertsMutations = false;
@@ -976,7 +980,7 @@ public class MutationState implements SQLCloseable {
if (updateGlobalClientMetrics) {
GLOBAL_MUTATION_BYTES.update(byteSize);
}
- return new MutationBytes(deleteCounter, deleteSize, byteSize, upsertCounter, upsertsize);
+ return new MutationBytes(deleteCounter, deleteSize, byteSize, upsertCounter, upsertsize, atomicUpsertsize);
}
public long getBatchSizeBytes() {
@@ -994,14 +998,16 @@ public class MutationState implements SQLCloseable {
private long totalMutationBytes;
private long upsertMutationCounter;
private long upsertMutationBytes;
+ private long atomicUpsertMutationBytes; // needed to calculate atomic upsert commit time
- public MutationBytes(long deleteMutationCounter, long deleteMutationBytes, long totalMutationBytes, long
- upsertMutationCounter, long upsertMutationBytes) {
+ public MutationBytes(long deleteMutationCounter, long deleteMutationBytes, long totalMutationBytes,
+ long upsertMutationCounter, long upsertMutationBytes, long atomicUpsertMutationBytes) {
this.deleteMutationCounter = deleteMutationCounter;
this.deleteMutationBytes = deleteMutationBytes;
this.totalMutationBytes = totalMutationBytes;
this.upsertMutationCounter = upsertMutationCounter;
this.upsertMutationBytes = upsertMutationBytes;
+ this.atomicUpsertMutationBytes = atomicUpsertMutationBytes;
}
@@ -1024,6 +1030,8 @@ public class MutationState implements SQLCloseable {
public long getUpsertMutationBytes() {
return upsertMutationBytes;
}
+
+ public long getAtomicUpsertMutationBytes() { return atomicUpsertMutationBytes; }
}
public enum MutationMetadataType {
@@ -1542,7 +1550,7 @@ public class MutationState implements SQLCloseable {
// in case we are dealing with all deletes for a non-transactional table, since there is a
// bug in sendMutations where we don't get the correct value for numFailedMutations when
// we don't use transactions
- return new MutationMetricQueue.MutationMetric(0, 0, 0, 0, 0,
+ return new MutationMetricQueue.MutationMetric(0, 0, 0, 0, 0, 0,
allDeletesMutations && !isTransactional ? numDeleteMutationsInBatch : numFailedMutations,
0, 0, 0, 0,
numUpsertMutationsInBatch,
@@ -1571,6 +1579,8 @@ public class MutationState implements SQLCloseable {
long numFailedPhase3Mutations, long mutationCommitTime) {
long committedUpsertMutationBytes = totalMutationBytesObject == null ? 0 :
totalMutationBytesObject.getUpsertMutationBytes();
+ long committedAtomicUpsertMutationBytes = totalMutationBytesObject == null ? 0:
+ totalMutationBytesObject.getAtomicUpsertMutationBytes();
long committedDeleteMutationBytes = totalMutationBytesObject == null ? 0 :
totalMutationBytesObject.getDeleteMutationBytes();
long committedUpsertMutationCounter = totalMutationBytesObject == null ? 0 :
@@ -1580,6 +1590,7 @@ public class MutationState implements SQLCloseable {
long committedTotalMutationBytes = totalMutationBytesObject == null ? 0 :
totalMutationBytesObject.getTotalMutationBytes();
long upsertMutationCommitTime = 0L;
+ long atomicUpsertMutationCommitTime = 0L;
long deleteMutationCommitTime = 0L;
if (totalMutationBytesObject != null && numFailedMutations != 0) {
@@ -1592,6 +1603,8 @@ public class MutationState implements SQLCloseable {
calculateMutationSize(uncommittedMutationsList, false);
committedUpsertMutationBytes -=
uncommittedMutationBytesObject.getUpsertMutationBytes();
+ committedAtomicUpsertMutationBytes -=
+ uncommittedMutationBytesObject.getAtomicUpsertMutationBytes();
committedDeleteMutationBytes -=
uncommittedMutationBytesObject.getDeleteMutationBytes();
committedUpsertMutationCounter -=
@@ -1606,6 +1619,9 @@ public class MutationState implements SQLCloseable {
upsertMutationCommitTime =
(long)Math.floor((double)(committedUpsertMutationBytes * mutationCommitTime)/
committedTotalMutationBytes);
+ atomicUpsertMutationCommitTime =
+ (long)Math.floor((double)(committedAtomicUpsertMutationBytes * mutationCommitTime)/
+ committedTotalMutationBytes);
deleteMutationCommitTime =
(long)Math.ceil((double)(committedDeleteMutationBytes * mutationCommitTime)/
committedTotalMutationBytes);
@@ -1614,6 +1630,7 @@ public class MutationState implements SQLCloseable {
committedUpsertMutationBytes,
committedDeleteMutationBytes,
upsertMutationCommitTime,
+ atomicUpsertMutationCommitTime,
deleteMutationCommitTime,
0, // num failed mutations have been counted already in updateMutationBatchFailureMetrics()
committedUpsertMutationCounter,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index e1294f0..96b4f18 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -17,21 +17,10 @@
*/
package org.apache.phoenix.hbase.index;
-import static org.apache.phoenix.hbase.index.util.IndexManagementUtil.rethrowIndexingException;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -41,6 +30,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@@ -51,6 +41,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.Region;
@@ -59,6 +50,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
@@ -67,11 +59,20 @@ import org.apache.phoenix.compat.hbase.coprocessor.CompatIndexRegionObserver;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
import org.apache.phoenix.coprocessor.GlobalIndexRegionScanner;
+import org.apache.phoenix.coprocessor.GlobalIndexRegionScanner.SimpleValueGetter;
+import org.apache.phoenix.coprocessor.generated.PTableProtos;
+import org.apache.phoenix.exception.DataExceedsCapacityException;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.expression.KeyValueColumnExpression;
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.LockManager.RowLock;
import org.apache.phoenix.hbase.index.builder.IndexBuildManager;
import org.apache.phoenix.hbase.index.builder.IndexBuilder;
import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
@@ -80,13 +81,20 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.hbase.index.write.LazyParallelWriterIndexCommitter;
import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexBuilder;
import org.apache.phoenix.index.PhoenixIndexMetaData;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PRow;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -96,12 +104,28 @@ import org.apache.phoenix.util.WALAnnotationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.applyNew;
import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.removeColumn;
+import static org.apache.phoenix.hbase.index.util.IndexManagementUtil.rethrowIndexingException;
+import static org.apache.phoenix.index.PhoenixIndexBuilder.ATOMIC_OP_ATTRIB;
/**
* Do all the work of managing index updates from a single coprocessor. All Puts/Delets are passed
@@ -467,7 +491,7 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
continue;
}
Mutation m = miniBatchOp.getOperation(i);
- if (this.builder.isEnabled(m)) {
+ if (this.builder.isAtomicOp(m) || this.builder.isEnabled(m)) {
ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
if (!context.rowsToLock.contains(row)) {
context.rowsToLock.add(row);
@@ -476,6 +500,53 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
}
}
+ /**
+ * Add the mutations generated by the ON DUPLICATE KEY UPDATE to the current batch.
+ * MiniBatchOperationInProgress#addOperationsFromCP() allows coprocessors to attach additional mutations
+ * to the incoming mutation. These additional mutations are only executed if the status of the original
+ * mutation is set to NOT_RUN. For atomic mutations, we want HBase to ignore the incoming mutation and
+ * instead execute the mutations generated by the server for that atomic mutation. But we can’t achieve
+ * this behavior just by setting the status of the original mutation to IGNORE because that will also
+ * ignore the additional mutations added by the coprocessors. To get around this, we need to do a fixup
+ * of the original mutation in the batch. Since we always generate one Put mutation from the incoming atomic
+ * Put mutation, we can transfer the cells from the generated Put mutation to the original atomic Put mutation in the batch.
+ * The additional mutations (Delete) can then be added to the operationsFromCoprocessors array.
+ */
+ private void addOnDupMutationsToBatch(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ int index, List<Mutation> mutations) {
+ List<Delete> deleteMutations = Lists.newArrayListWithExpectedSize(mutations.size());
+ for (Mutation m : mutations) {
+ if (m instanceof Put) {
+ // fix the incoming atomic mutation
+ Mutation original = miniBatchOp.getOperation(index);
+ original.getFamilyCellMap().putAll(m.getFamilyCellMap());
+ } else if (m instanceof Delete) {
+ deleteMutations.add((Delete)m);
+ }
+ }
+
+ if (!deleteMutations.isEmpty()) {
+ miniBatchOp.addOperationsFromCP(index,
+ deleteMutations.toArray(new Mutation[deleteMutations.size()]));
+ }
+ }
+
+ private void addOnDupMutationsToBatch(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ BatchMutateContext context) throws IOException {
+ for (int i = 0; i < miniBatchOp.size(); i++) {
+ Mutation m = miniBatchOp.getOperation(i);
+ if (this.builder.isAtomicOp(m) && m instanceof Put) {
+ List<Mutation> mutations = generateOnDupMutations(context, (Put)m);
+ if (!mutations.isEmpty()) {
+ addOnDupMutationsToBatch(miniBatchOp, i, mutations);
+ } else {
+ // empty list of generated mutations implies ON DUPLICATE KEY IGNORE
+ miniBatchOp.setOperationStatus(i, IGNORE);
+ }
+ }
+ }
+ }
+
private void lockRows(BatchMutateContext context) throws IOException {
for (ImmutableBytesPtr rowKey : context.rowsToLock) {
context.rowLocks.add(lockManager.lockRow(rowKey, rowLockWaitDuration));
@@ -520,29 +591,48 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
context.multiMutationMap.put(row, stored);
}
stored.addAll(m);
+ Mutation[] mutationsAddedByCP = miniBatchOp.getOperationsFromCoprocessors(i);
+ if (mutationsAddedByCP != null) {
+ for (Mutation addedMutation : mutationsAddedByCP) {
+ stored.addAll(addedMutation);
+ }
+ }
}
}
return context.multiMutationMap.values();
}
- public static void setTimestamps(MiniBatchOperationInProgress<Mutation> miniBatchOp, IndexBuildManager builder, long ts) throws IOException {
+ public static void setTimestamps(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ IndexBuildManager builder, long ts) throws IOException {
for (Integer i = 0; i < miniBatchOp.size(); i++) {
if (miniBatchOp.getOperationStatus(i) == IGNORE) {
continue;
}
Mutation m = miniBatchOp.getOperation(i);
- // skip this mutation if we aren't enabling indexing
- if (!builder.isEnabled(m)) {
+ // skip this mutation if we aren't enabling indexing or not an atomic op
+ if (!builder.isEnabled(m) && !builder.isAtomicOp(m)) {
continue;
}
- for (List<Cell> cells : m.getFamilyCellMap().values()) {
- for (Cell cell : cells) {
- CellUtil.setTimestamp(cell, ts);
+ setTimestampOnMutation(m, ts);
+
+ // set the timestamps on any additional mutations added
+ Mutation[] mutationsAddedByCP = miniBatchOp.getOperationsFromCoprocessors(i);
+ if (mutationsAddedByCP != null) {
+ for (Mutation addedMutation : mutationsAddedByCP) {
+ setTimestampOnMutation(addedMutation, ts);
}
}
}
}
+ private static void setTimestampOnMutation(Mutation m, long ts) throws IOException {
+ for (List<Cell> cells : m.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ CellUtil.setTimestamp(cell, ts);
+ }
+ }
+ }
+
/**
* This method applies pending delete mutations on the next row states
*/
@@ -559,38 +649,51 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
if (!(m instanceof Delete)) {
continue;
}
- ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
- Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
- if (dataRowState == null) {
- dataRowState = new Pair<Put, Put>(null, null);
- context.dataRowStates.put(rowKeyPtr, dataRowState);
+
+ if (!applyOnePendingDeleteMutation(context, (Delete) m)) {
+ miniBatchOp.setOperationStatus(i, NOWRITE);
}
- Put nextDataRowState = dataRowState.getSecond();
- if (nextDataRowState == null) {
- if (dataRowState.getFirst() == null) {
- // This is a delete row mutation on a non-existing row. There is no need to apply this mutation
- // on the data table
- miniBatchOp.setOperationStatus(i, NOWRITE);
- }
- continue;
+ }
+ }
+
+ /**
+ * This method returns true if the pending delete mutation needs to be applied
+ * and false f the delete mutation can be ignored for example in the case of
+ * delete on non-existing row.
+ */
+ private boolean applyOnePendingDeleteMutation(BatchMutateContext context, Delete delete) {
+ ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(delete.getRow());
+ Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
+ if (dataRowState == null) {
+ dataRowState = new Pair<Put, Put>(null, null);
+ context.dataRowStates.put(rowKeyPtr, dataRowState);
+ }
+ Put nextDataRowState = dataRowState.getSecond();
+ if (nextDataRowState == null) {
+ if (dataRowState.getFirst() == null) {
+ // This is a delete row mutation on a non-existing row. There is no need to apply this mutation
+ // on the data table
+ return false;
}
- for (List<Cell> cells : m.getFamilyCellMap().values()) {
- for (Cell cell : cells) {
- switch (KeyValue.Type.codeToType(cell.getTypeByte())) {
- case DeleteFamily:
- case DeleteFamilyVersion:
- nextDataRowState.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
- break;
- case DeleteColumn:
- case Delete:
- removeColumn(nextDataRowState, cell);
- }
+ }
+
+ for (List<Cell> cells : delete.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ switch (KeyValue.Type.codeToType(cell.getTypeByte())) {
+ case DeleteFamily:
+ case DeleteFamilyVersion:
+ nextDataRowState.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
+ break;
+ case DeleteColumn:
+ case Delete:
+ removeColumn(nextDataRowState, cell);
}
}
- if (nextDataRowState != null && nextDataRowState.getFamilyCellMap().size() == 0) {
- dataRowState.setSecond(null);
- }
}
+ if (nextDataRowState != null && nextDataRowState.getFamilyCellMap().size() == 0) {
+ dataRowState.setSecond(null);
+ }
+ return true;
}
/**
@@ -608,21 +711,32 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
if (!this.builder.isEnabled(m)) {
continue;
}
- if (m instanceof Put) {
- ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
- Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
- if (dataRowState == null) {
- dataRowState = new Pair<Put, Put>(null, null);
- context.dataRowStates.put(rowKeyPtr, dataRowState);
+
+ if (!(m instanceof Put)) {
+ continue;
+ }
+
+ ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
+ Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
+ if (dataRowState == null) {
+ dataRowState = new Pair<Put, Put>(null, null);
+ context.dataRowStates.put(rowKeyPtr, dataRowState);
+ }
+ Put nextDataRowState = dataRowState.getSecond();
+ dataRowState.setSecond((nextDataRowState != null) ? applyNew((Put) m, nextDataRowState) : new Put((Put) m));
+
+ Mutation[] mutationsAddedByCP = miniBatchOp.getOperationsFromCoprocessors(i);
+ if (mutationsAddedByCP != null) {
+ // all added mutations are of type delete corresponding to set nulls
+ for (Mutation addedMutation : mutationsAddedByCP) {
+ applyOnePendingDeleteMutation(context, (Delete)addedMutation);
}
- Put nextDataRowState = dataRowState.getSecond();
- dataRowState.setSecond((nextDataRowState != null) ? applyNew((Put) m, nextDataRowState) : new Put((Put) m));
}
}
}
/**
- * * Prepares data row current and next row states
+ * * Prepares next data row state
*/
private void prepareDataRowStates(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp,
@@ -631,8 +745,6 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
if (context.rowsToLock.size() == 0) {
return;
}
- // Retrieve the current row states from the data table
- getCurrentRowStates(c, context);
applyPendingPutMutations(miniBatchOp, context, now);
applyPendingDeleteMutations(miniBatchOp, context);
}
@@ -677,6 +789,10 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
localUpdates.add(next.getFirst());
}
if (!localUpdates.isEmpty()) {
+ Mutation[] mutationsAddedByCP = miniBatchOp.getOperationsFromCoprocessors(0);
+ if (mutationsAddedByCP != null) {
+ localUpdates.addAll(Arrays.asList(mutationsAddedByCP));
+ }
miniBatchOp.addOperationsFromCP(0, localUpdates.toArray(new Mutation[localUpdates.size()]));
}
}
@@ -885,8 +1001,7 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
}
}
}
- removePendingRows(context);
- context.indexUpdates.clear();
+ // all cleanup will be done in postBatchMutateIndispensably()
}
private static boolean hasGlobalIndex(PhoenixIndexMetaData indexMetaData) {
@@ -907,6 +1022,19 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
return false;
}
+ private boolean hasAtomicUpdate(MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+ for (int i = 0; i < miniBatchOp.size(); i++) {
+ if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+ continue;
+ }
+ Mutation m = miniBatchOp.getOperation(i);
+ if (this.builder.isAtomicOp(m)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private void waitForPreviousConcurrentBatch(TableName table, BatchMutateContext context)
throws Throwable {
boolean done;
@@ -914,10 +1042,7 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
done = true;
for (BatchMutateContext lastContext : context.lastConcurrentBatchContext.values()) {
phase = lastContext.getCurrentPhase();
- if (phase == BatchMutatePhase.FAILED) {
- done = false;
- break;
- }
+
if (phase == BatchMutatePhase.PRE) {
CountDownLatch countDownLatch = lastContext.getCountDownLatch();
// Release the locks so that the previous concurrent mutation can go into the post phase
@@ -926,23 +1051,30 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
// lastContext.getMaxPendingRowCount() is the depth of the subtree rooted at the batch pointed by lastContext
if (!countDownLatch.await((lastContext.getMaxPendingRowCount() + 1) * concurrentMutationWaitDuration,
TimeUnit.MILLISECONDS)) {
+ LOG.debug(String.format("latch timeout context %s last %s", context, lastContext));
done = false;
- break;
}
// Acquire the locks again before letting the region proceed with data table updates
lockRows(context);
+ if (!done) {
+ // previous concurrent batch did not complete so we have to retry this batch
+ break;
+ } else {
+ // read the phase again to determine the status of previous batch
+ phase = lastContext.getCurrentPhase();
+ LOG.debug(String.format("context %s last %s exit phase %s", context, lastContext, phase));
+ }
+ }
+
+ if (phase == BatchMutatePhase.FAILED) {
+ done = false;
+ break;
}
}
if (!done) {
// This batch needs to be retried since one of the previous concurrent batches has not completed yet.
- // Throwing an IOException will result in retries of this batch. Before throwing exception,
- // we need to remove reference counts and locks for the rows of this batch
- removePendingRows(context);
- context.indexUpdates.clear();
- for (RowLock rowLock : context.rowLocks) {
- rowLock.release();
- }
- context.rowLocks.clear();
+ // Throwing an IOException will result in retries of this batch. Removal of reference counts and
+ // locks for the rows of this batch will be done in postBatchMutateIndispensably()
throw new IOException("One of the previous concurrent mutations has not completed. " +
"The batch needs to be retried " + table.getNameAsString());
}
@@ -950,25 +1082,52 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
- ignoreAtomicOperations(miniBatchOp);
PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, miniBatchOp);
BatchMutateContext context = new BatchMutateContext(indexMetaData.getClientVersion());
setBatchMutateContext(c, context);
context.populateOriginalMutations(miniBatchOp);
+
// Need to add cell tags to Delete Marker before we do any index processing
// since we add tags to tables which doesn't have indexes also.
IndexUtil.setDeleteAttributes(miniBatchOp);
- /*
- * Exclusively lock all rows so we get a consistent read
- * while determining the index updates
- */
+ // Exclusively lock all rows so we get a consistent read while
+ // determining the index updates
populateRowsToLock(miniBatchOp, context);
// early exit if it turns out we don't have any update for indexes
if (context.rowsToLock.isEmpty()) {
return;
}
lockRows(context);
+
+ boolean hasAtomic = hasAtomicUpdate(miniBatchOp);
+ long onDupCheckTime = 0;
+
+ if (hasAtomic || hasGlobalIndex(indexMetaData)) {
+ // Retrieve the current row states from the data table while holding the lock.
+ // This is needed for both atomic mutations and global indexes
+ long start = EnvironmentEdgeManager.currentTimeMillis();
+ getCurrentRowStates(c, context);
+ onDupCheckTime += (EnvironmentEdgeManager.currentTimeMillis() - start);
+ }
+
+ if (hasAtomic) {
+ long start = EnvironmentEdgeManager.currentTimeMillis();
+ // add the mutations for conditional updates to the mini batch
+ addOnDupMutationsToBatch(miniBatchOp, context);
+
+ // release locks for ON DUPLICATE KEY IGNORE since we won't be changing those rows
+ // this is needed so that we can exit early
+ releaseLocksForOnDupIgnoreMutations(miniBatchOp, context);
+ onDupCheckTime += (EnvironmentEdgeManager.currentTimeMillis() - start);
+ metricSource.updateDuplicateKeyCheckTime(dataTableName, onDupCheckTime);
+
+ // early exit if we are not changing any rows
+ if (context.rowsToLock.isEmpty()) {
+ return;
+ }
+ }
+
long now = EnvironmentEdgeManager.currentTimeMillis();
// Update the timestamps of the data table mutations to prevent overlapping timestamps (which prevents index
// inconsistencies as this case isn't handled correctly currently).
@@ -976,7 +1135,7 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
if (hasGlobalIndex(indexMetaData)) {
- // Prepare current and next data rows states for pending mutations (for global indexes)
+ // Prepare next data rows states for pending mutations (for global indexes)
prepareDataRowStates(c, miniBatchOp, context, now);
// Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
// concurrent updates
@@ -1014,17 +1173,47 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
}
}
+ /**
+ * In case of ON DUPLICATE KEY IGNORE, if the row already exists no mutations will be
+ * generated so release the row lock.
+ */
+ private void releaseLocksForOnDupIgnoreMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ BatchMutateContext context) {
+ for (int i = 0; i < miniBatchOp.size(); i++) {
+ // status of all ON DUPLICATE KEY IGNORE mutations is updated to IGNORE
+ if (miniBatchOp.getOperationStatus(i) != IGNORE) {
+ continue;
+ }
+ Mutation m = miniBatchOp.getOperation(i);
+ if (!this.builder.isAtomicOp(m)) {
+ continue;
+ }
+ ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+ Iterator<RowLock> rowLockIterator = context.rowLocks.iterator();
+ while(rowLockIterator.hasNext()){
+ RowLock rowLock = rowLockIterator.next();
+ ImmutableBytesPtr rowKey = rowLock.getRowKey();
+ if (row.equals(rowKey)) {
+ rowLock.release();
+ rowLockIterator.remove();
+ context.rowsToLock.remove(row);
+ break;
+ }
+ }
+ }
+ }
+
private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) {
this.batchMutateContext.set(context);
}
- private BatchMutateContext getBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
+ private BatchMutateContext getBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
return this.batchMutateContext.get();
- }
+ }
- private void removeBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
+ private void removeBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
this.batchMutateContext.remove();
- }
+ }
@Override
public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> c, WALKey key,
@@ -1035,6 +1224,11 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
}
}
+ /**
+ * When this hook is called, all the rows in the batch context are locked. Because the rows
+ * are locked, we can safely make updates to the context object and perform the necessary
+ * cleanup.
+ */
@Override
public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException {
@@ -1056,6 +1250,10 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
countDownLatch.countDown();
}
}
+ removePendingRows(context);
+ if (context.indexUpdates != null) {
+ context.indexUpdates.clear();
+ }
unlockRows(context);
this.builder.batchCompleted(miniBatchOp);
@@ -1138,10 +1336,10 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
metricSource.updatePreIndexUpdateFailureTime(dataTableName,
EnvironmentEdgeManager.currentTimeMillis() - start);
metricSource.incrementPreIndexUpdateFailures(dataTableName);
- // Remove all locks as they are already unlocked. There is no need to unlock them again later when
- // postBatchMutateIndispensably() is called
- removePendingRows(context);
- context.rowLocks.clear();
+ // Re-acquire all locks since we released them before making index updates
+ // Removal of reference counts and locks for the rows of this batch will be
+ // done in postBatchMutateIndispensably()
+ lockRows(context);
rethrowIndexingException(e);
}
throw new RuntimeException(
@@ -1165,4 +1363,256 @@ public class IndexRegionObserver extends CompatIndexRegionObserver {
properties.put(IndexRegionObserver.INDEX_BUILDER_CONF_KEY, builder.getName());
desc.addCoprocessor(IndexRegionObserver.class.getName(), null, priority, properties);
}
-}
\ No newline at end of file
+
+ private void extractExpressionsAndColumns(DataInputStream input,
+ List<Pair<PTable, List<Expression>>> operations,
+ final Set<ColumnReference> colsReadInExpr) throws IOException {
+ while (true) {
+ ExpressionVisitor<Void> visitor = new StatelessTraverseAllExpressionVisitor<Void>() {
+ @Override
+ public Void visit(KeyValueColumnExpression expression) {
+ colsReadInExpr.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnQualifier()));
+ return null;
+ }
+ };
+ try {
+ int nExpressions = WritableUtils.readVInt(input);
+ List<Expression> expressions = Lists.newArrayListWithExpectedSize(nExpressions);
+ for (int i = 0; i < nExpressions; i++) {
+ Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
+ expression.readFields(input);
+ expressions.add(expression);
+ expression.accept(visitor);
+ }
+ PTableProtos.PTable tableProto = PTableProtos.PTable.parseDelimitedFrom(input);
+ PTable table = PTableImpl.createFromProto(tableProto);
+ operations.add(new Pair<>(table, expressions));
+ } catch (EOFException e) {
+ break;
+ }
+ }
+ }
+
+ /**
+ * This function has been adapted from PhoenixIndexBuilder#executeAtomicOp().
+ * The critical difference being that the code in PhoenixIndexBuilder#executeAtomicOp()
+ * generates the mutations by reading the latest data table row from HBase but in order
+ * to correctly support concurrent index mutations we need to always read the latest
+ * data table row from memory.
+ * It takes in an atomic Put mutation and generates a list of Put and Delete mutations.
+ * The list will be empty in the case of ON DUPLICATE KEY IGNORE and the row already exists.
+ * In the case of ON DUPLICATE KEY UPDATE, we will generate one Put mutation and optionally
+ * one Delete mutation (with DeleteColumn type cells for all columns set to null).
+ */
+ private List<Mutation> generateOnDupMutations(BatchMutateContext context, Put atomicPut) throws IOException {
+ List<Mutation> mutations = Lists.newArrayListWithExpectedSize(2);
+ byte[] opBytes = atomicPut.getAttribute(ATOMIC_OP_ATTRIB);
+ if (opBytes == null) { // Unexpected
+ return null;
+ }
+ Put put = null;
+ Delete delete = null;
+
+ // mutations returned by this function will have the LATEST timestamp
+ // later these timestamps will be updated by the IndexRegionObserver#setTimestamps() function
+ long ts = HConstants.LATEST_TIMESTAMP;
+
+ byte[] rowKey = atomicPut.getRow();
+ ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(rowKey);
+ // Get the latest data row state
+ Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
+ Put currentDataRowState = dataRowState != null ? dataRowState.getFirst() : null;
+
+ if (PhoenixIndexBuilder.isDupKeyIgnore(opBytes)) {
+ if (currentDataRowState == null) {
+ // new row
+ mutations.add(atomicPut);
+ }
+ return mutations;
+ }
+
+ ByteArrayInputStream stream = new ByteArrayInputStream(opBytes);
+ DataInputStream input = new DataInputStream(stream);
+ boolean skipFirstOp = input.readBoolean();
+ short repeat = input.readShort();
+
+ List<Pair<PTable, List<Expression>>> operations = Lists.newArrayListWithExpectedSize(3);
+ final Set<ColumnReference> colsReadInExpr = new HashSet<>();
+ // deserialize the conditional update expressions and
+ // extract the columns that are read in the conditional expressions
+ extractExpressionsAndColumns(input, operations, colsReadInExpr);
+ int estimatedSize = colsReadInExpr.size();
+
+ // initialized to either the incoming new row or the current row
+ // stores the intermediate values as we apply conditional update expressions
+ List<Cell> flattenedCells;
+ // read the column values requested in the get from the current data row
+ List<Cell> cells = readColumnsFromRow(currentDataRowState, colsReadInExpr);
+
+ if (currentDataRowState == null) { // row doesn't exist
+ if (skipFirstOp) {
+ if (operations.size() <= 1 && repeat <= 1) {
+ // early exit since there is only one ON DUPLICATE KEY UPDATE
+ // clause which is ignored because the row doesn't exist so
+ // simply use the values in UPSERT VALUES
+ mutations.add(atomicPut);
+ return mutations;
+ }
+ // If there are multiple ON DUPLICATE KEY UPDATE on a new row,
+ // the first one is skipped
+ repeat--;
+ }
+ // Base current state off of new row
+ flattenedCells = flattenCells(atomicPut);
+ } else {
+ // Base current state off of existing row
+ flattenedCells = cells;
+ }
+
+ MultiKeyValueTuple tuple = new MultiKeyValueTuple(flattenedCells);
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+
+ // for each conditional upsert in the batch
+ for (int opIndex = 0; opIndex < operations.size(); opIndex++) {
+ Pair<PTable, List<Expression>> operation = operations.get(opIndex);
+ PTable table = operation.getFirst();
+ List<Expression> expressions = operation.getSecond();
+ for (int j = 0; j < repeat; j++) { // repeater loop
+ ptr.set(rowKey);
+ // Sort the list of cells (if they've been flattened in which case they're
+ // not necessarily ordered correctly).
+ if (flattenedCells != null) {
+ Collections.sort(flattenedCells, KeyValue.COMPARATOR);
+ }
+ PRow row = table.newRow(GenericKeyValueBuilder.INSTANCE, ts, ptr, false);
+ int adjust = table.getBucketNum() == null ? 1 : 2;
+ for (int i = 0; i < expressions.size(); i++) {
+ Expression expression = expressions.get(i);
+ ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+ expression.evaluate(tuple, ptr);
+ PColumn column = table.getColumns().get(i + adjust);
+ Object value = expression.getDataType().toObject(ptr, column.getSortOrder());
+ // We are guaranteed that the two column will have the same type
+ if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(),
+ expression.getSortOrder(), expression.getMaxLength(), expression.getScale(),
+ column.getMaxLength(), column.getScale())) {
+ throw new DataExceedsCapacityException(column.getDataType(), column.getMaxLength(),
+ column.getScale(), column.getName().getString());
+ }
+ column.getDataType().coerceBytes(ptr, value, expression.getDataType(), expression.getMaxLength(),
+ expression.getScale(), expression.getSortOrder(), column.getMaxLength(), column.getScale(),
+ column.getSortOrder(), table.rowKeyOrderOptimizable());
+ byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr);
+ row.setValue(column, bytes);
+ }
+ List<Cell> updatedCells = Lists.newArrayListWithExpectedSize(estimatedSize);
+ List<Mutation> newMutations = row.toRowMutations();
+ for (Mutation source : newMutations) {
+ flattenCells(source, updatedCells);
+ }
+ // update the cells to the latest values calculated above
+ flattenedCells = mergeCells(flattenedCells, updatedCells);
+ tuple.setKeyValues(flattenedCells);
+ }
+ // Repeat only applies to first statement
+ repeat = 1;
+ }
+
+ for (int i = 0; i < tuple.size(); i++) {
+ Cell cell = tuple.getValue(i);
+ if (Type.codeToType(cell.getTypeByte()) == Type.Put) {
+ if (put == null) {
+ put = new Put(rowKey);
+ transferAttributes(atomicPut, put);
+ mutations.add(put);
+ }
+ put.add(cell);
+ } else {
+ if (delete == null) {
+ delete = new Delete(rowKey);
+ transferAttributes(atomicPut, delete);
+ mutations.add(delete);
+ }
+ delete.addDeleteMarker(cell);
+ }
+ }
+ return mutations;
+ }
+
+ private List<Cell> readColumnsFromRow(Put currentDataRow, Set<ColumnReference> cols) {
+ if (currentDataRow == null) {
+ return Collections.EMPTY_LIST;
+ }
+
+ List<Cell> columnValues = Lists.newArrayList();
+
+ // just return any cell FirstKeyOnlyFilter
+ if (cols.isEmpty()) {
+ for (List<Cell> cells : currentDataRow.getFamilyCellMap().values()) {
+ if (cells == null || cells.isEmpty()) {
+ continue;
+ }
+ columnValues.add(cells.get(0));
+ break;
+ }
+ return columnValues;
+ }
+
+ SimpleValueGetter valueGetter = new SimpleValueGetter(currentDataRow);
+ for (ColumnReference colRef : cols) {
+ Cell cell = valueGetter.getLatestCell(colRef, HConstants.LATEST_TIMESTAMP);
+ if (cell != null) {
+ columnValues.add(cell);
+ }
+ }
+ return columnValues;
+ }
+
+ private static List<Cell> flattenCells(Mutation m) {
+ List<Cell> flattenedCells = new ArrayList<>();
+ flattenCells(m, flattenedCells);
+ return flattenedCells;
+ }
+
+ private static void flattenCells(Mutation m, List<Cell> flattenedCells) {
+ for (List<Cell> cells : m.getFamilyCellMap().values()) {
+ flattenedCells.addAll(cells);
+ }
+ }
+
+ /**
+ * ensure that the generated mutations have all the attributes like schema
+ */
+ private static void transferAttributes(Mutation source, Mutation target) {
+ for (Map.Entry<String, byte[]> entry : source.getAttributesMap().entrySet()) {
+ target.setAttribute(entry.getKey(), entry.getValue());
+ }
+ }
+
+ /**
+ * First take all the cells that are present in the latest. Then look at current
+ * and any cell not present in latest is taken.
+ */
+ private static List<Cell> mergeCells(List<Cell> current, List<Cell> latest) {
+ Map<ColumnReference, Cell> latestColVals = Maps.newHashMapWithExpectedSize(latest.size() + current.size());
+
+ // first take everything present in latest
+ for (Cell cell : latest) {
+ byte[] family = CellUtil.cloneFamily(cell);
+ byte[] qualifier = CellUtil.cloneQualifier(cell);
+ ColumnReference colInfo = new ColumnReference(family, qualifier);
+ latestColVals.put(colInfo, cell);
+ }
+
+ // check for any leftovers in current
+ for (Cell cell : current) {
+ byte[] family = CellUtil.cloneFamily(cell);
+ byte[] qualifier = CellUtil.cloneQualifier(cell);
+ ColumnReference colInfo = new ColumnReference(family, qualifier);
+ if (!latestColVals.containsKey(colInfo)) {
+ latestColVals.put(colInfo, cell);
+ }
+ }
+ return Lists.newArrayList(latestColVals.values());
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 9d7d5f0..2a76887 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -20,6 +20,8 @@ package org.apache.phoenix.jdbc;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_QUERY_TIME;
import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.DELETE_FAILED_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.DELETE_SQL_COUNTER;
@@ -511,6 +513,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
boolean success = false;
String tableName = null;
boolean isUpsert = false;
+ boolean isAtomicUpsert = false;
boolean isDelete = false;
MutationState state = null;
MutationPlan plan = null;
@@ -525,6 +528,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE);
isUpsert = stmt instanceof ExecutableUpsertStatement;
isDelete = stmt instanceof ExecutableDeleteStatement;
+ isAtomicUpsert = isUpsert && ((ExecutableUpsertStatement)stmt).getOnDupKeyPairs() != null;
if (plan.getTargetRef() != null && plan.getTargetRef().getTable() != null) {
if(!Strings.isNullOrEmpty(plan.getTargetRef().getTable().getPhysicalName().toString())) {
tableName = plan.getTargetRef().getTable().getPhysicalName().toString();
@@ -596,6 +600,12 @@ public class PhoenixStatement implements Statement, SQLCloseable {
UPSERT_SQL_COUNTER : DELETE_SQL_COUNTER, 1);
TableMetricsManager.updateMetricsMethod(tableName, isUpsert ?
UPSERT_SQL_QUERY_TIME : DELETE_SQL_QUERY_TIME, executeMutationTimeSpent);
+ if (isAtomicUpsert) {
+ TableMetricsManager.updateMetricsMethod(tableName,
+ ATOMIC_UPSERT_SQL_COUNTER, 1);
+ TableMetricsManager.updateMetricsMethod(tableName,
+ ATOMIC_UPSERT_SQL_QUERY_TIME, executeMutationTimeSpent);
+ }
if (success) {
TableMetricsManager.updateMetricsMethod(tableName, isUpsert ?
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index dced4ca..ed7d003 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -56,6 +56,12 @@ public enum MetricType {
+ " autoCommit is true, the total time taken for executeMutation + conn.commit",
LogLevel.OFF, PLong.INSTANCE),
+ ATOMIC_UPSERT_SQL_COUNTER("auc", "Counter for number of atomic upsert sql queries", LogLevel.OFF, PLong.INSTANCE),
+ ATOMIC_UPSERT_COMMIT_TIME("aut", "Time it took to commit a batch of atomic upserts", LogLevel.OFF, PLong.INSTANCE),
+ ATOMIC_UPSERT_SQL_QUERY_TIME("auqt", "Time taken by atomic upsert sql queries inside executeMutation or if"
+ + " autoCommit is true, the total time taken for executeMutation + conn.commit",
+ LogLevel.OFF, PLong.INSTANCE),
+
// delete-specific metrics updated during executeMutation
DELETE_SQL_COUNTER("dc", "Counter for number of delete sql queries", LogLevel.OFF, PLong.INSTANCE),
DELETE_SUCCESS_SQL_COUNTER("dssc", "Counter for number of delete sql queries that successfully"
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
index b8444c3..a44483a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.monitoring;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_COMMIT_TIME;
import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_SIZE;
import static org.apache.phoenix.monitoring.MetricType.DELETE_COMMIT_TIME;
@@ -82,6 +83,7 @@ public class MutationMetricQueue {
publishedMetricsForTable.put(metric.getDeleteMutationsSizeBytes().getMetricType(), metric.getDeleteMutationsSizeBytes().getValue());
publishedMetricsForTable.put(metric.getCommitTimeForMutations().getMetricType(), metric.getCommitTimeForMutations().getValue());
publishedMetricsForTable.put(metric.getTotalCommitTimeForUpserts().getMetricType(), metric.getTotalCommitTimeForUpserts().getValue());
+ publishedMetricsForTable.put(metric.getTotalCommitTimeForAtomicUpserts().getMetricType(), metric.getTotalCommitTimeForAtomicUpserts().getValue());
publishedMetricsForTable.put(metric.getTotalCommitTimeForDeletes().getMetricType(), metric.getTotalCommitTimeForDeletes().getValue());
publishedMetricsForTable.put(metric.getNumFailedMutations().getMetricType(), metric.getNumFailedMutations().getValue());
publishedMetricsForTable.put(metric.getNumOfIndexCommitFailedMutations().getMetricType(), metric.getNumOfIndexCommitFailedMutations().getValue());
@@ -110,6 +112,7 @@ public class MutationMetricQueue {
private final CombinableMetric totalCommitTimeForMutations = new CombinableMetricImpl(MUTATION_COMMIT_TIME);
private final CombinableMetric numFailedMutations = new CombinableMetricImpl(MUTATION_BATCH_FAILED_SIZE);
private final CombinableMetric totalCommitTimeForUpserts = new CombinableMetricImpl(UPSERT_COMMIT_TIME);
+ private final CombinableMetric totalCommitTimeForAtomicUpserts = new CombinableMetricImpl(ATOMIC_UPSERT_COMMIT_TIME);
private final CombinableMetric totalCommitTimeForDeletes = new CombinableMetricImpl(DELETE_COMMIT_TIME);
private final CombinableMetric upsertMutationsSizeBytes = new CombinableMetricImpl(UPSERT_MUTATION_BYTES);
private final CombinableMetric deleteMutationsSizeBytes = new CombinableMetricImpl(DELETE_MUTATION_BYTES);
@@ -124,17 +127,18 @@ public class MutationMetricQueue {
INDEX_COMMIT_FAILURE_SIZE);
public static final MutationMetric EMPTY_METRIC =
- new MutationMetric(0,0,0,0,0,0,0,0,0,0,0,0,0,0);
+ new MutationMetric(0,0,0,0, 0, 0,0,0,0,0,0,0,0,0,0);
public MutationMetric(long numMutations, long upsertMutationsSizeBytes,
- long deleteMutationsSizeBytes, long commitTimeForUpserts, long commitTimeForDeletes,
- long numFailedMutations, long upsertMutationSqlCounterSuccess,
+ long deleteMutationsSizeBytes, long commitTimeForUpserts, long commitTimeForAtomicUpserts,
+ long commitTimeForDeletes, long numFailedMutations, long upsertMutationSqlCounterSuccess,
long deleteMutationSqlCounterSuccess, long totalMutationBytes,
long numOfPhase3Failed, long upsertBatchFailedSize,
long upsertBatchFailedCounter, long deleteBatchFailedSize,
long deleteBatchFailedCounter) {
this.numMutations.change(numMutations);
this.totalCommitTimeForUpserts.change(commitTimeForUpserts);
+ this.totalCommitTimeForAtomicUpserts.change(commitTimeForAtomicUpserts);
this.totalCommitTimeForDeletes.change(commitTimeForDeletes);
this.totalCommitTimeForMutations.change(commitTimeForUpserts + commitTimeForDeletes);
this.numFailedMutations.change(numFailedMutations);
@@ -154,6 +158,8 @@ public class MutationMetricQueue {
return totalCommitTimeForUpserts;
}
+ public CombinableMetric getTotalCommitTimeForAtomicUpserts() { return totalCommitTimeForAtomicUpserts; }
+
public CombinableMetric getTotalCommitTimeForDeletes() {
return totalCommitTimeForDeletes;
}
@@ -213,6 +219,7 @@ public class MutationMetricQueue {
public void combineMetric(MutationMetric other) {
this.numMutations.combine(other.numMutations);
this.totalCommitTimeForUpserts.combine(other.totalCommitTimeForUpserts);
+ this.totalCommitTimeForAtomicUpserts.combine(other.totalCommitTimeForAtomicUpserts);
this.totalCommitTimeForDeletes.combine(other.totalCommitTimeForDeletes);
this.totalCommitTimeForMutations.combine(other.totalCommitTimeForMutations);
this.numFailedMutations.combine(other.numFailedMutations);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
index b640f29..13ef856 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
@@ -68,6 +68,9 @@ import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_SUCCESS_
import static org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_SUCCESS_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_FAILURE_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_QUERY_TIME;
/**
* This is used by TableMetricsManager class to store instance of
@@ -121,7 +124,10 @@ public class TableClientMetrics {
DELETE_AGGREGATE_SUCCESS_SQL_COUNTER), TABLE_DELETE_AGGREGATE_FAILURE_SQL_COUNTER(
DELETE_AGGREGATE_FAILURE_SQL_COUNTER), TABLE_SELECT_AGGREGATE_SUCCESS_SQL_COUNTER(
SELECT_AGGREGATE_SUCCESS_SQL_COUNTER), TABLE_SELECT_AGGREGATE_FAILURE_SQL_COUNTER(
- SELECT_AGGREGATE_FAILURE_SQL_COUNTER);
+ SELECT_AGGREGATE_FAILURE_SQL_COUNTER),
+ TABLE_ATOMIC_UPSERT_SQL_COUNTER(ATOMIC_UPSERT_SQL_COUNTER),
+ TABLE_ATOMIC_UPSERT_COMMIT_TIME(ATOMIC_UPSERT_COMMIT_TIME),
+ TABLE_ATOMIC_UPSERT_SQL_QUERY_TIME(ATOMIC_UPSERT_SQL_QUERY_TIME);
private final MetricType metricType;
private PhoenixTableMetric metric;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index cf9518f..ca95c1a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -1284,7 +1284,7 @@ public class PTableImpl implements PTable {
}
private void newMutations() {
- Mutation put = this.hasOnDupKey ? new Increment(this.key) : new Put(this.key);
+ Mutation put = new Put(this.key);
Delete delete = new Delete(this.key);
if (isWALDisabled()) {
put.setDurability(Durability.SKIP_WAL);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 06c8fd1..4500a18 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -2851,21 +2851,6 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
}
@Test
- public void testOnDupKeyWithGlobalIndex() throws Exception {
- Connection conn = DriverManager.getConnection(getUrl());
- try {
- conn.createStatement().execute("CREATE TABLE t1 (k integer not null primary key, v bigint)");
- conn.createStatement().execute("CREATE INDEX idx ON t1 (v)");
- conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE v = v + 1");
- fail();
- } catch (SQLException e) {
- assertEquals(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_WITH_GLOBAL_IDX.getErrorCode(), e.getErrorCode());
- } finally {
- conn.close();
- }
- }
-
- @Test
public void testUpdatePKOnDupKey() throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
try {