You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by go...@apache.org on 2022/02/14 22:34:44 UTC
[phoenix] branch 4.x updated: PHOENIX-6639 Read repair for transforming table
This is an automated email from the ASF dual-hosted git repository.
gokcen pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new 4752007 PHOENIX-6639 Read repair for transforming table
4752007 is described below
commit 475200783d62a842df73657c3d474decb42af1c0
Author: Gokcen Iskender <gi...@salesforce.com>
AuthorDate: Tue Jan 4 15:58:37 2022 -0800
PHOENIX-6639 Read repair for transforming table
Signed-off-by: Gokcen Iskender <go...@gmail.com>
---
.../phoenix/end2end/transform/TransformToolIT.java | 110 ++++++++++++++++++
.../coprocessor/BaseScannerRegionObserver.java | 1 +
.../phoenix/coprocessor/ScanRegionObserver.java | 25 ++++
.../UngroupedAggregateRegionObserver.java | 3 +-
.../apache/phoenix/index/GlobalIndexChecker.java | 19 ++-
.../phoenix/iterate/TableResultIterator.java | 1 +
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 2 +-
.../org/apache/phoenix/query/QueryConstants.java | 4 +-
.../schema/transform/SystemTransformRecord.java | 12 +-
.../apache/phoenix/schema/transform/Transform.java | 6 +-
.../schema/transform/TransformMaintainer.java | 26 ++++-
.../java/org/apache/phoenix/util/ScanUtil.java | 127 ++++++++++++++-------
12 files changed, 278 insertions(+), 58 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
index 673f468..9ece889 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
@@ -450,6 +450,116 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
}
@Test
+ public void testTransformMutationReadRepair() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ int numOfRows = 0;
+ createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
+ SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+ conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+ " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+ String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+ PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+ IndexToolIT.upsertRow(stmt1, 1);
+
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+ IndexToolIT.upsertRow(stmt1, 2);
+
+ assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
+ assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
+
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+
+ Transform.doCutover(conn.unwrap(PhoenixConnection.class), record);
+ Transform.updateTransformRecord(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED);
+
+ assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
+ assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
+
+ // Now do read repair
+ String select = "SELECT * FROM " + dataTableFullName;
+ ResultSet rs = conn.createStatement().executeQuery(select);
+ assertTrue(rs.next());
+ assertTrue(rs.next());
+ assertFalse(rs.next());
+
+ assertEquals(0, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
+ assertEquals(2, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
+ } finally {
+ IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+ }
+ }
+
+ @Test
+ public void testTransformIndexReadRepair() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = "IDX_" + generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ int numOfRows = 0;
+ createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
+ SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+ conn.createStatement().execute("CREATE INDEX " + indexTableName + " ON " + dataTableFullName + " (NAME) INCLUDE (ZIP)");
+ SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, indexTableFullName);
+ conn.createStatement().execute("ALTER INDEX " + indexTableName + " ON " + dataTableFullName +
+ " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+
+ SystemTransformRecord record = Transform.getTransformRecord(schemaName, indexTableName, dataTableFullName, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+ String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+ PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+ IndexToolIT.upsertRow(stmt1, 1);
+
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+ IndexToolIT.upsertRow(stmt1, 2);
+
+ assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
+ assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
+
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+
+ Transform.doCutover(conn.unwrap(PhoenixConnection.class), record);
+
+ assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
+ assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
+
+ // Now do read repair
+ String select = "SELECT NAME, ZIP FROM " + dataTableFullName;
+ ResultSet rs = conn.createStatement().executeQuery(select);
+ assertTrue(rs.next());
+ assertTrue(rs.next());
+ assertFalse(rs.next());
+
+ assertEquals(0, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
+ assertEquals(2, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
+ } finally {
+ IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+ }
+ }
+
+ @Test
public void testTransformMutationFailureRepair() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 1217d23..9a63e88 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -144,6 +144,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String EMPTY_COLUMN_FAMILY_NAME = "_EmptyCFName";
public static final String EMPTY_COLUMN_QUALIFIER_NAME = "_EmptyCQName";
public static final String INDEX_ROW_KEY = "_IndexRowKey";
+ public static final String READ_REPAIR_TRANSFORMING_TABLE = "_ReadRepairTransformingTable";
public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(1);
public final static byte[] REPLAY_ONLY_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(2);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index df4986b..e7dead8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.NavigableMap;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
@@ -36,6 +37,9 @@ import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.coprocessor.generated.DynamicColumnMetaDataProtos;
import org.apache.phoenix.coprocessor.generated.PTableProtos;
import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.hbase.index.metrics.GlobalIndexCheckerSource;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
+import org.apache.phoenix.index.GlobalIndexChecker;
import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnImpl;
@@ -66,6 +70,22 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
public static final String WILDCARD_SCAN_INCLUDES_DYNAMIC_COLUMNS =
"_WildcardScanIncludesDynCols";
+ private static boolean readRepairTransformingTable = false;
+ private static GlobalIndexChecker.GlobalIndexScanner globalIndexScanner;
+ private static GlobalIndexChecker globalIndexChecker = new GlobalIndexChecker();
+ private static GlobalIndexCheckerSource metricsSource = MetricsIndexerSourceFactory.getInstance().getGlobalIndexCheckerSource();
+
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ globalIndexChecker.start(env);
+ super.start(env);
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment env) throws IOException {
+ globalIndexChecker.stop(env);
+ super.stop(env);
+ }
public static void serializeIntoScan(Scan scan, int limit,
List<OrderByExpression> orderByExpressions, int estimatedRowSize) {
@@ -184,6 +204,11 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
@Override
protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable {
NonAggregateRegionScannerFactory nonAggregateROUtil = new NonAggregateRegionScannerFactory(c.getEnvironment());
+ if (scan.getAttribute(BaseScannerRegionObserver.READ_REPAIR_TRANSFORMING_TABLE) != null) {
+ readRepairTransformingTable = true;
+ globalIndexScanner = globalIndexChecker.new GlobalIndexScanner(c.getEnvironment(), scan, s, metricsSource);
+ return nonAggregateROUtil.getRegionScanner(scan, globalIndexScanner);
+ }
return nonAggregateROUtil.getRegionScanner(scan, s);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 55f3f8e..bb78acb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
@@ -77,7 +76,6 @@ import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ExpressionType;
-import org.apache.phoenix.filter.AllVersionsIndexRebuildFilter;
import org.apache.phoenix.hbase.index.Indexer;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.exception.IndexWriteException;
@@ -400,6 +398,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
});
}
+
boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
int offsetToBe = 0;
if (localIndexScan) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index cb3c429..3c25587 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.index;
import static org.apache.phoenix.index.IndexMaintainer.getIndexMaintainer;
+import static org.apache.phoenix.query.QueryConstants.UNVERIFIED_BYTES;
import static org.apache.phoenix.query.QueryConstants.VERIFIED_BYTES;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
import static org.apache.phoenix.util.ScanUtil.getDummyResult;
@@ -63,6 +64,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
@@ -122,7 +124,7 @@ public class GlobalIndexChecker extends BaseScannerRegionObserver {
* An instance of this class is created for each scanner on an index
* and used to verify individual rows and rebuild them if they are not valid
*/
- private class GlobalIndexScanner extends BaseRegionScanner {
+ public class GlobalIndexScanner extends BaseRegionScanner {
private RegionScanner scanner;
private long ageThreshold;
private Scan scan;
@@ -543,9 +545,18 @@ public class GlobalIndexChecker extends BaseScannerRegionObserver {
while (cellIterator.hasNext()) {
cell = cellIterator.next();
if (isEmptyColumn(cell)) {
- if (Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
- VERIFIED_BYTES, 0, VERIFIED_BYTES.length) != 0) {
- return false;
+ if (indexMaintainer instanceof TransformMaintainer) {
+ // This is a transforming table. After cutoff, if there are new mutations on the table,
+ // their empty col value would be x. So, we are only interested in unverified ones.
+ if (Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+ UNVERIFIED_BYTES, 0, UNVERIFIED_BYTES.length) == 0) {
+ return false;
+ }
+ } else {
+ if (Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+ VERIFIED_BYTES, 0, VERIFIED_BYTES.length) != 0) {
+ return false;
+ }
}
// Empty column is not supposed to be returned to the client except it is the only column included
// in the scan
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 95e181c..6e2e25d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -26,6 +26,7 @@ import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NO
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UNINITIALIZED;
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
import java.io.IOException;
import java.sql.SQLException;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 298e7a9..6861ecc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -224,7 +224,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
public static final String OLD_METADATA = "OLD_METADATA";
public static final String NEW_METADATA = "NEW_METADATA";
public static final String TRANSFORM_FUNCTION = "TRANSFORM_FUNCTION";
- public static final String TRANSFORM_TABLE_TTL = "864000";
+ public static final String TRANSFORM_TABLE_TTL = "7776000"; // 90 days
public static final int TTL_FOR_MUTEX = 15 * 60; // 15min
public static final String ARRAY_SIZE = "ARRAY_SIZE";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 9dc55c4..bc8d161 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -567,14 +567,14 @@ public interface QueryConstants {
TRANSFORM_RETRY_COUNT + " INTEGER NULL," +
TRANSFORM_START_TS + " TIMESTAMP NULL," +
TRANSFORM_LAST_STATE_TS + " TIMESTAMP NULL," +
- OLD_METADATA + " VARCHAR NULL,\n" +
+ OLD_METADATA + " VARBINARY NULL,\n" +
NEW_METADATA + " VARCHAR NULL,\n" +
TRANSFORM_FUNCTION + " VARCHAR NULL\n" +
"CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" +
TENANT_ID + "," + TABLE_SCHEM + "," + LOGICAL_TABLE_NAME + "))\n" +
HConstants.VERSIONS + "=%s,\n" +
HColumnDescriptor.KEEP_DELETED_CELLS + "=%s,\n" +
- HColumnDescriptor.TTL + "=" + TRANSFORM_TABLE_TTL + ",\n" + // 10 days
+ HColumnDescriptor.TTL + "=" + TRANSFORM_TABLE_TTL + ",\n" + // 90 days
HTableDescriptor.SPLIT_POLICY + "='"
+ SystemTaskSplitPolicy.class.getName() + "',\n" +
TRANSACTIONAL + "=" + Boolean.FALSE + ",\n" +
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java
index 63378bb..d6fec5a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java
@@ -45,14 +45,14 @@ public class SystemTransformRecord {
private final Integer transformRetryCount;
private final Timestamp startTs;
private final Timestamp lastStateTs;
- private final String oldMetadata;
+ private final byte[] oldMetadata;
private final String newMetadata;
private final String transformFunction;
public SystemTransformRecord(PTable.TransformType transformType,
String schemaName, String logicalTableName, String tenantId, String newPhysicalTableName, String logicalParentName,
String transformStatus, String transformJobId, Integer transformRetryCount, Timestamp startTs,
- Timestamp lastStateTs, String oldMetadata, String newMetadata, String transformFunction) {
+ Timestamp lastStateTs, byte[] oldMetadata, String newMetadata, String transformFunction) {
this.transformType = transformType;
this.schemaName = schemaName;
this.tenantId = tenantId;
@@ -119,7 +119,7 @@ public class SystemTransformRecord {
return lastStateTs;
}
- public String getOldMetadata() {
+ public byte[] getOldMetadata() {
return oldMetadata;
}
public String getNewMetadata() {
@@ -149,7 +149,7 @@ public class SystemTransformRecord {
private int transformRetryCount =0;
private Timestamp startTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
private Timestamp lastStateTs;
- private String oldMetadata;
+ private byte[] oldMetadata;
private String newMetadata;
private String transformFunction;
@@ -214,7 +214,7 @@ public class SystemTransformRecord {
return this;
}
- public SystemTransformBuilder setOldMetadata(String oldMetadata) {
+ public SystemTransformBuilder setOldMetadata(byte[] oldMetadata) {
this.oldMetadata = oldMetadata;
return this;
}
@@ -268,7 +268,7 @@ public class SystemTransformRecord {
builder.setTransformRetryCount(resultSet.getInt(col++));
builder.setStartTs(resultSet.getTimestamp(col++));
builder.setLastStateTs(resultSet.getTimestamp(col++));
- builder.setOldMetadata(resultSet.getString(col++));
+ builder.setOldMetadata(resultSet.getBytes(col++));
builder.setNewMetadata(resultSet.getString(col++));
builder.setTransformFunction(resultSet.getString(col++));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
index a2e3f55..1d81911 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
@@ -109,7 +109,7 @@ public class Transform {
long sequenceNum, PTable.TransformType transformType) throws SQLException {
try {
String newMetadata = JacksonUtil.getObjectWriter().writeValueAsString(changingProperties);
- String oldMetadata = "";
+ byte[] oldMetadata = PTableImpl.toProto(table).toByteArray();
String newPhysicalTableName = "";
SystemTransformRecord.SystemTransformBuilder transformBuilder = new SystemTransformRecord.SystemTransformBuilder();
String schema = table.getSchemaName()!=null ? table.getSchemaName().getString() : null;
@@ -401,9 +401,9 @@ public class Transform {
stmt.setNull(colNum++, Types.TIMESTAMP);
}
if (systemTransformParams.getOldMetadata() != null) {
- stmt.setString(colNum++, systemTransformParams.getOldMetadata());
+ stmt.setBytes(colNum++, systemTransformParams.getOldMetadata());
} else {
- stmt.setNull(colNum++, Types.VARCHAR);
+ stmt.setNull(colNum++, Types.VARBINARY);
}
if (systemTransformParams.getNewMetadata() != null) {
stmt.setString(colNum++, systemTransformParams.getNewMetadata());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
index 734abe1..8d08b10 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
@@ -64,6 +64,7 @@ import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@@ -76,7 +77,7 @@ import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCOD
public class TransformMaintainer extends IndexMaintainer {
private boolean isMultiTenant;
- // indexed expressions that are not present in the row key of the data table, the expression can also refer to a regular column
+ // expressions that are not present in the row key of the old table, the expression can also refer to a regular column
private List<Expression> newTableExpressions;
private Set<ColumnReference> newTableColumns;
@@ -196,6 +197,19 @@ public class TransformMaintainer extends IndexMaintainer {
}
}
+ public Set<ColumnReference> getAllColumnsForDataTable() {
+ Set<ColumnReference> result = Sets.newLinkedHashSetWithExpectedSize(newTableExpressions.size() + coveredColumnsMap.size());
+ result.addAll(newTableColumns);
+ for (ColumnReference colRef : coveredColumnsMap.keySet()) {
+ if (oldTableImmutableStorageScheme == PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
+ result.add(colRef);
+ } else {
+ result.add(new ColumnReference(colRef.getFamily(), QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES));
+ }
+ }
+ return result;
+ }
+
/*
* Build the old table row key
*/
@@ -376,6 +390,16 @@ public class TransformMaintainer extends IndexMaintainer {
for (ServerCachingProtos.ColumnInfo info : newTblColumnInfoList) {
maintainer.newTableColumnsInfo.add(new Pair<>(info.getFamilyName(), info.getColumnName()));
}
+ maintainer.newTableExpressions = new ArrayList<>();
+ try (ByteArrayInputStream stream = new ByteArrayInputStream(proto.getNewTableExpressions().toByteArray())) {
+ DataInput input = new DataInputStream(stream);
+ while (stream.available() > 0) {
+ int expressionOrdinal = WritableUtils.readVInt(input);
+ Expression expression = ExpressionType.values()[expressionOrdinal].newInstance();
+ expression.readFields(input);
+ maintainer.newTableExpressions.add(expression);
+ }
+ }
// proto doesn't support single byte so need an explicit cast here
maintainer.newTableEncodingScheme = PTable.QualifierEncodingScheme.fromSerializedValue((byte) proto.getNewTableEncodingScheme());
maintainer.newTableImmutableStorageScheme = PTable.ImmutableStorageScheme.fromSerializedValue((byte) proto.getNewTableImmutableStorageScheme());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 18972fa..c1d3468 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -39,6 +39,7 @@ import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.coprocessor.generated.PTableProtos;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.BaseQueryPlan;
@@ -66,11 +67,16 @@ import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.RowKeySchema;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.ValueSchema.Field;
+import org.apache.phoenix.schema.tool.SchemaExtractionProcessor;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
@@ -1129,46 +1135,89 @@ public class ScanUtil {
}
public static void setScanAttributesForIndexReadRepair(Scan scan, PTable table, PhoenixConnection phoenixConnection) throws SQLException {
- if (table.isTransactional() || table.getType() != PTableType.INDEX) {
- return;
- }
+ boolean isTransforming = (table.getTransformingNewTable() != null);
PTable indexTable = table;
- if (indexTable.getIndexType() != PTable.IndexType.GLOBAL) {
- return;
- }
- String schemaName = indexTable.getParentSchemaName().getString();
- String tableName = indexTable.getParentTableName().getString();
- PTable dataTable;
- try {
- dataTable = PhoenixRuntime.getTable(phoenixConnection, SchemaUtil.getTableName(schemaName, tableName));
- } catch (TableNotFoundException e) {
- // This index table must be being deleted. No need to set the scan attributes
- return;
- }
- // MetaDataClient modifies the index table name for view indexes if the parent view of an index has a child
- // view. This, we need to recreate a PTable object with the correct table name for the rest of this code to work
- if (indexTable.getViewIndexId() != null && indexTable.getName().getString().contains(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
- int lastIndexOf = indexTable.getName().getString().lastIndexOf(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR);
- String indexName = indexTable.getName().getString().substring(lastIndexOf + 1);
- indexTable = PhoenixRuntime.getTable(phoenixConnection, indexName);
- }
- if (!dataTable.getIndexes().contains(indexTable)) {
- return;
- }
- if (scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD) == null) {
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- IndexMaintainer.serialize(dataTable, ptr, Collections.singletonList(indexTable), phoenixConnection);
- scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
- }
- scan.setAttribute(BaseScannerRegionObserver.CHECK_VERIFY_COLUMN, TRUE_BYTES);
- scan.setAttribute(BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME, dataTable.getPhysicalName().getBytes());
- IndexMaintainer indexMaintainer = indexTable.getIndexMaintainer(dataTable, phoenixConnection);
- byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
- byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
- scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyCF);
- scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyCQ);
- if (scan.getAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS) == null) {
- BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable);
+ // Transforming index table can be repaired in regular path via globalindexchecker coproc on it.
+ // phoenixConnection is closed when it is called from mappers
+ if (!phoenixConnection.isClosed() && table.getType() == PTableType.TABLE && isTransforming) {
+ SystemTransformRecord systemTransformRecord = Transform.getTransformRecord(indexTable.getSchemaName(), indexTable.getTableName(),
+ null, phoenixConnection.getTenantId(), phoenixConnection);
+ if (systemTransformRecord == null) {
+ return;
+ }
+ // Old table is still active, cutover didn't happen yet, so, no need to read repair
+ if (!systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.COMPLETED.name())) {
+ return;
+ }
+ byte[] oldTableBytes = systemTransformRecord.getOldMetadata();
+ if (oldTableBytes == null || oldTableBytes.length == 0) {
+ return;
+ }
+ PTable oldTable = null;
+ try {
+ oldTable = PTableImpl.createFromProto(PTableProtos.PTable.parseFrom(oldTableBytes));
+ } catch (IOException e) {
+ LOGGER.error("Cannot parse old table info for read repair for table " + table.getName());
+ return;
+ }
+ TransformMaintainer indexMaintainer = indexTable.getTransformMaintainer(oldTable, phoenixConnection);
+ // This is the path where we are reading from the newly transformed table
+ if (scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD) == null) {
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ TransformMaintainer.serialize(oldTable, ptr, indexTable, phoenixConnection);
+ scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
+ }
+ scan.setAttribute(BaseScannerRegionObserver.CHECK_VERIFY_COLUMN, TRUE_BYTES);
+ scan.setAttribute(BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME, oldTable.getPhysicalName().getBytes());
+ byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+ byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+ scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyCF);
+ scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyCQ);
+ scan.setAttribute(BaseScannerRegionObserver.READ_REPAIR_TRANSFORMING_TABLE, TRUE_BYTES);
+ } else {
+ if (table.isTransactional() || table.getType() != PTableType.INDEX) {
+ return;
+ }
+ if (indexTable.getIndexType() != PTable.IndexType.GLOBAL) {
+ return;
+ }
+
+ String schemaName = indexTable.getParentSchemaName().getString();
+ String tableName = indexTable.getParentTableName().getString();
+ PTable dataTable;
+ try {
+ dataTable = PhoenixRuntime.getTable(phoenixConnection, SchemaUtil.getTableName(schemaName, tableName));
+ } catch (TableNotFoundException e) {
+ // This index table must be being deleted. No need to set the scan attributes
+ return;
+ }
+ // MetaDataClient modifies the index table name for view indexes if the parent view of an index has a child
+ // view. This, we need to recreate a PTable object with the correct table name for the rest of this code to work
+ if (indexTable.getViewIndexId() != null && indexTable.getName().getString().contains(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
+ int lastIndexOf = indexTable.getName().getString().lastIndexOf(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR);
+ String indexName = indexTable.getName().getString().substring(lastIndexOf + 1);
+ indexTable = PhoenixRuntime.getTable(phoenixConnection, indexName);
+ }
+ if (!dataTable.getIndexes().contains(indexTable)) {
+ return;
+ }
+
+ if (scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD) == null) {
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ IndexMaintainer.serialize(dataTable, ptr, Collections.singletonList(indexTable), phoenixConnection);
+ scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
+ }
+ scan.setAttribute(BaseScannerRegionObserver.CHECK_VERIFY_COLUMN, TRUE_BYTES);
+ scan.setAttribute(BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME, dataTable.getPhysicalName().getBytes());
+ IndexMaintainer indexMaintainer = indexTable.getIndexMaintainer(dataTable, phoenixConnection);
+ byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+ byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+ scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyCF);
+ scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyCQ);
+ if (scan.getAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS) == null) {
+ BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable);
+ }
+ addEmptyColumnToScan(scan, emptyCF, emptyCQ);
}
}