You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by go...@apache.org on 2022/01/11 05:11:52 UTC
[phoenix] branch 4.x updated: PHOENIX-6620 TransformTool to fix unverified rows and do validation
This is an automated email from the ASF dual-hosted git repository.
gokcen pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new d32745d PHOENIX-6620 TransformTool to fix unverified rows and do validation
d32745d is described below
commit d32745d8c4d46f1ff20f0b346af3d21ffc1e65e8
Author: Gokcen Iskender <gi...@salesforce.com>
AuthorDate: Wed Sep 8 17:17:10 2021 -0700
PHOENIX-6620 TransformTool to fix unverified rows and do validation
Signed-off-by: Gokcen Iskender <go...@gmail.com>
---
.../apache/phoenix/end2end/TransformToolIT.java | 327 +++++++++++++++++++--
.../end2end/index/ImmutableIndexExtendedIT.java | 2 +-
.../apache/phoenix/end2end/index/TransformIT.java | 31 ++
.../apache/phoenix/exception/SQLExceptionCode.java | 3 +-
.../index/PhoenixIndexImportDirectReducer.java | 18 +-
.../transform/PhoenixTransformReducer.java | 38 ++-
.../transform/PhoenixTransformRepairMapper.java | 205 +++++++++++++
.../phoenix/mapreduce/transform/TransformTool.java | 122 +++++++-
.../mapreduce/util/PhoenixConfigurationUtil.java | 17 ++
.../org/apache/phoenix/schema/MetaDataClient.java | 4 +
.../schema/tool/SchemaExtractionProcessor.java | 8 +
.../schema/transform/TransformMaintainer.java | 56 +++-
12 files changed, 767 insertions(+), 64 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java
index 5002c8c..52646aa 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TransformToolIT.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.index.SingleCellIndexIT;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.mapreduce.transform.TransformTool;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
@@ -54,6 +55,18 @@ import java.util.Map;
import java.util.Properties;
import java.util.UUID;
+import static org.apache.phoenix.end2end.index.ImmutableIndexExtendedIT.getRowCountForEmptyColValue;
+import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT;
+import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_VALUE_BYTES;
+import static org.apache.phoenix.query.QueryConstants.UNVERIFIED_BYTES;
+import static org.apache.phoenix.query.QueryConstants.VERIFIED_BYTES;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.apache.phoenix.util.TestUtil.getRowCount;
import static org.junit.Assert.assertEquals;
@@ -71,7 +84,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
public TransformToolIT() {
StringBuilder optionBuilder = new StringBuilder();
- optionBuilder.append(" IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES=NONE, TTL=18000 ");
+ optionBuilder.append(" IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES=NONE ");
if (!mutable) {
optionBuilder.append(", IMMUTABLE_ROWS=true ");
}
@@ -86,16 +99,21 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8));
+ serverProps.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
new ReadOnlyProps(clientProps.entrySet().iterator()));
}
private void createTableAndUpsertRows(Connection conn, String dataTableFullName, int numOfRows) throws SQLException {
+ createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
+ }
+
+ private void createTableAndUpsertRows(Connection conn, String dataTableFullName, int numOfRows, String tableOptions) throws SQLException {
String stmString1 =
"CREATE TABLE IF NOT EXISTS " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
- + tableDDLOptions;
+ + tableOptions;
conn.createStatement().execute(stmString1);
String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
@@ -125,7 +143,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
assertNotNull(record);
List<String> args = getArgList(schemaName, dataTableName, null,
- null, null, null, false, false, false, false);
+ null, null, null, false, false, false, false,false);
runTransformTool(args.toArray(new String[0]), 0);
record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
assertEquals(PTable.TransformStatus.COMPLETED.name(), record.getTransformStatus());
@@ -167,7 +185,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
assertNotNull(record);
List<String> args = getArgList(schemaName, dataTableName, null,
- null, null, null, true, false, false, false);
+ null, null, null, true, false, false, false, false);
runTransformTool(args.toArray(new String[0]), 0);
record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
@@ -187,7 +205,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
assertNotNull(record);
List<String> args = getArgList(schemaName, dataTableName, null,
- null, null, null, false, true, false, false);
+ null, null, null, false, true, false, false, false);
runTransformTool(args.toArray(new String[0]), 0);
record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
@@ -214,7 +232,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
conn.setAutoCommit(true);
pauseTableTransform(schemaName, dataTableName, conn);
List<String> args = getArgList(schemaName, dataTableName, null,
- null, null, null, false, false, true, false);
+ null, null, null, false, false, true, false, false);
runTransformTool(args.toArray(new String[0]), 0);
SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
@@ -282,7 +300,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
" SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
List<String> args = getArgList(schemaName, dataTableName, null,
- null, null, null, false, false, false, false);
+ null, null, null, false, false, false, false, false);
// split if data table more than 3 regions
args.add("--autosplit=3");
@@ -336,7 +354,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
" SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
List<String> args = getArgList(schemaName, dataTableName, null,
- null, null, null, false, false, false, false);
+ null, null, null, false, false, false, false, false);
runTransformTool(args.toArray(new String[0]), 0);
SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, newDataTN.getNameAsString());
@@ -382,7 +400,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
assertNotNull(record);
List<String> args = getArgList(schemaName, dataTableName, indexTableName,
- null, null, null, false, false, false, false);
+ null, null, null, false, false, false, false, false);
runTransformTool(args.toArray(new String[0]), 0);
record = Transform.getTransformRecord(schemaName, indexTableName, dataTableFullName, null, conn.unwrap(PhoenixConnection.class));
@@ -413,13 +431,24 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String parentViewName = "VWP_" + generateUniqueName();
+ String viewName = "VW_" + generateUniqueName();
+ String viewIdxName = "VWIDX_" + generateUniqueName();
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(true);
- IndexRegionObserver.setFailPreIndexUpdatesForTesting(true);
int numOfRows = 0;
createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+ String createParentViewSql = "CREATE VIEW " + parentViewName + " ( PARENT_VIEW_COL1 VARCHAR ) AS SELECT * FROM " + dataTableFullName;
+ conn.createStatement().execute(createParentViewSql);
+
+ String createViewSql = "CREATE VIEW " + viewName + " ( VIEW_COL1 INTEGER, VIEW_COL2 VARCHAR ) AS SELECT * FROM " + parentViewName;
+ conn.createStatement().execute(createViewSql);
+
+ String createViewIdxSql = "CREATE INDEX " + viewIdxName + " ON " + viewName + " (VIEW_COL1) include (VIEW_COL2) ";
+ conn.createStatement().execute(createViewIdxSql);
+
SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
@@ -427,8 +456,8 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
assertNotNull(record);
assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
-
- String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+ IndexRegionObserver.setFailPreIndexUpdatesForTesting(true);
+ String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", viewName);
PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
try {
IndexToolIT.upsertRow(stmt1, 1);
@@ -436,36 +465,280 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
} catch (Exception e) {
}
assertEquals(0, getRowCount(conn, record.getNewPhysicalTableName()));
- assertEquals(0, getRowCount(conn,dataTableFullName));
+ assertEquals(0, getRowCount(conn, dataTableFullName));
IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
IndexToolIT.upsertRow(stmt1, 2);
+ IndexToolIT.upsertRow(stmt1, 3);
- assertEquals(1, getRowCount(conn, record.getNewPhysicalTableName()));
- assertEquals(1, getRowCount(conn,dataTableFullName));
+ assertEquals(2, getRowCount(conn, record.getNewPhysicalTableName()));
+ assertEquals(2, getRowCount(conn,dataTableFullName));
IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
try {
- IndexToolIT.upsertRow(stmt1, 3);
+ IndexToolIT.upsertRow(stmt1, 4);
fail("Data table upsert should have failed");
} catch (Exception e) {
}
- assertEquals(2, getRowCount(conn, record.getNewPhysicalTableName()));
- assertEquals(1, getRowCount(conn,dataTableFullName));
+ try {
+ IndexToolIT.upsertRow(stmt1, 5);
+ fail("Data table upsert should have failed");
+ } catch (Exception e) {
+ }
+ assertEquals(4, getRowCount(conn, record.getNewPhysicalTableName()));
+ assertEquals(2, getRowCount(conn,dataTableFullName));
IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
List<String> args = getArgList(schemaName, dataTableName, null,
- null, null, null, false, false, false, false);
+ null, null, null, false, false, false, false, true);
runTransformTool(args.toArray(new String[0]), 0);
- //TODO: Implement GlobalIndexChecker like repair for unverified rows
- assertEquals(1, getRowCount(conn.unwrap(PhoenixConnection.class).getQueryServices()
+ assertEquals(2, getRowCount(conn.unwrap(PhoenixConnection.class).getQueryServices()
.getTable(Bytes.toBytes(dataTableFullName)), false));
-// assertEquals(1, getRowCount(conn.unwrap(PhoenixConnection.class).getQueryServices()
-// .getTable(Bytes.toBytes(record.getNewPhysicalTableName())), false));
+ assertEquals(0, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
+ assertEquals(2, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
+ } finally {
+ IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+ }
+ }
+
+ @Test
+ public void testTransformVerifiedForTransactionalTable() throws Exception {
+ // TODO: Column encoding is not supported for OMID. Have omid test for other type of transforms.
+ // For now, we don't support transforms other than storage and column encoding type change.
+ //testVerifiedForTransactionalTable("OMID");
+ testVerifiedForTransactionalTable("TEPHRA");
+ }
+
+ private void testVerifiedForTransactionalTable(String provider) throws Exception{
+ String tableOptions = tableDDLOptions + " ,TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + provider + "'";
+
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ int numOfRows = 1;
+ createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableOptions);
+ SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+ conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+ " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+ List<String> args = getArgList(schemaName, dataTableName, null,
+ null, null, null, false, false, false, false, false);
+ runTransformTool(args.toArray(new String[0]), 0);
+ assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
+
+
+ String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+ PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+
+ // Run again to check that VERIFIED row still remains verified
+ runTransformTool(args.toArray(new String[0]), 0);
+ assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
+ assertEquals(0, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
+
+ // We will have two rows with empty col = 'x' since there is no IndexRegionObserver for transactional table
+ IndexToolIT.upsertRow(stmt1, ++numOfRows);
+ IndexToolIT.upsertRow(stmt1, ++numOfRows);
+
+ assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
+ assertEquals(0, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
+ assertEquals(2, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), EMPTY_COLUMN_VALUE_BYTES));
+
+ // Even when we run TransformTool again, verified bit is not cleared but the empty column stays as is
+ args = getArgList(schemaName, dataTableName, null,
+ null, null, null, false, false, false, false, false);
+ runTransformTool(args.toArray(new String[0]), 0);
+ assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES));
+ assertEquals(0, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
+ assertEquals(2, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), EMPTY_COLUMN_VALUE_BYTES));
+ }
+ }
+
+ @Test
+ public void testTransformVerify() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ int numOfRows = 2;
+ createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+ SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+ conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+ " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+ IndexRegionObserver.setFailPreIndexUpdatesForTesting(true);
+ String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+ PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+ try {
+ IndexToolIT.upsertRow(stmt1, numOfRows+1);
+ fail("New table upsert should have failed");
+ } catch (Exception e) {
+ }
+ // We didn't run transform tool yet. So new table would have 0.
+ assertEquals(0, getRowCount(conn, record.getNewPhysicalTableName()));
+ assertEquals(numOfRows, getRowCount(conn, dataTableFullName));
+
+ IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+ IndexToolIT.upsertRow(stmt1, ++numOfRows);
+ IndexToolIT.upsertRow(stmt1, ++numOfRows);
+
+ // 2 missing in the new table since it is the initial rows
+ assertEquals(numOfRows-2, getRowCount(conn, record.getNewPhysicalTableName()));
+ assertEquals(numOfRows, getRowCount(conn,dataTableFullName));
+
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+ try {
+ IndexToolIT.upsertRow(stmt1, numOfRows+1);
+ fail("Data table upsert should have failed");
+ } catch (Exception e) {
+ }
+ // Three unverified row in new table, original 2 rows missing since transform tool did not run.
+ // One unverified row is not in the data table. 2 unverified row is in the data table.
+ assertEquals(numOfRows-1, getRowCount(conn, record.getNewPhysicalTableName()));
+ assertEquals(numOfRows, getRowCount(conn,dataTableFullName));
+
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+
+ List<String> args = getArgList(schemaName, dataTableName, null,
+ null, null, null, false, false, false, false, false);
+ args.add("-v");
+ args.add(IndexTool.IndexVerifyType.ONLY.getValue());
+ // Run only validation and check
+ TransformTool transformTool = runTransformTool(args.toArray(new String[0]), 0);
+ assertEquals(numOfRows, transformTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+ assertEquals(numOfRows, transformTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
+ assertEquals(0, transformTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+ assertEquals(numOfRows-2, transformTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(2, transformTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(2, transformTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT).getValue());
+
+ args = getArgList(schemaName, dataTableName, null,
+ null, null, null, false, false, false, false, false);
+ args.add("-v");
+ args.add(IndexTool.IndexVerifyType.AFTER.getValue());
+ // Run after validation and check that 2 missing (initial) rows are built.
+ transformTool = runTransformTool(args.toArray(new String[0]), 0);
+ assertEquals(numOfRows, transformTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+ assertEquals(numOfRows, transformTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
+ assertEquals(numOfRows, transformTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+ assertEquals(numOfRows, transformTool.getJob().getCounters().findCounter(AFTER_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+
+ int numOfRows2= 2;
+ String dataTableName2 = generateUniqueName();
+ String dataTableFullName2 = SchemaUtil.getTableName(schemaName, dataTableName2);
+ createTableAndUpsertRows(conn, dataTableFullName2, numOfRows2);
+ conn.createStatement().execute("ALTER TABLE " + dataTableFullName2 +
+ " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ record = Transform.getTransformRecord(schemaName, dataTableName2, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+
+ // Original 2 rows missing since transform tool did not run
+ assertEquals(0, getRowCount(conn, record.getNewPhysicalTableName()));
+ assertEquals(numOfRows2, getRowCount(conn,dataTableFullName2));
+
+ args = getArgList(schemaName, dataTableName2, null,
+ null, null, null, false, false, false, false, false);
+ args.add("-v");
+ args.add(IndexTool.IndexVerifyType.BEFORE.getValue());
+ // Run before validation and check.
+ transformTool = runTransformTool(args.toArray(new String[0]), 0);
+ assertEquals(numOfRows2, transformTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+ assertEquals(numOfRows2, transformTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
+ assertEquals(numOfRows2, transformTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+ assertEquals(numOfRows2-2, transformTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(numOfRows2, transformTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, transformTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT).getValue());
+ } finally {
+ IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+ }
+ }
+
+ @Test
+ public void testTransformVerify_shouldFixUnverified() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ int numOfRows = 1;
+ int numOfRowsInNewTbl = 0;
+ createTableAndUpsertRows(conn, dataTableFullName, numOfRows);
+ SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+ conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+ " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+ String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+ PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+ // We didn't run transform tool yet. So new table would have 0.
+ assertEquals(0, getRowCount(conn, record.getNewPhysicalTableName()));
+ assertEquals(numOfRows, getRowCount(conn, dataTableFullName));
+
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+ IndexToolIT.upsertRow(stmt1, ++numOfRows);
+ numOfRowsInNewTbl++;
+ IndexToolIT.upsertRow(stmt1, ++numOfRows);
+ numOfRowsInNewTbl++;
+
+ // 1 missing in the new table since it is the initial rows
+ assertEquals(numOfRows-1, getRowCount(conn, record.getNewPhysicalTableName()));
+ assertEquals(numOfRows, getRowCount(conn,dataTableFullName));
+
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+ try {
+ numOfRowsInNewTbl++;
+ IndexToolIT.upsertRow(stmt1, numOfRows+1);
+ fail("Data table upsert should have failed");
+ } catch (Exception e) {
+ }
+ try {
+ numOfRowsInNewTbl++;
+ IndexToolIT.upsertRow(stmt1, numOfRows+2);
+ fail("Data table upsert should have failed");
+ } catch (Exception e) {
+ }
+ // Four unverified row in new table, original 1 row is missing since transform tool did not run.
+ // Two of the unverified rows is not in the data table.
+ assertEquals(numOfRowsInNewTbl, getRowCount(conn, record.getNewPhysicalTableName()));
+ assertEquals(numOfRows, getRowCount(conn,dataTableFullName));
+
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+ List<String> args = getArgList(schemaName, dataTableName, null,
+ null, null, null, false, false, false, false, true);
+ args.add("-v");
+ args.add(IndexTool.IndexVerifyType.BEFORE.getValue());
+ TransformTool transformTool = runTransformTool(args.toArray(new String[0]), 0);
+ // Run after validation and check that 2 missing (initial) rows are built but the unverified one is not built yet since we didn't build with -fixunverified option.
+ assertEquals(numOfRowsInNewTbl, transformTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+ assertEquals(2, transformTool.getJob().getCounters().findCounter(BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT).getValue());
} finally {
IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
@@ -475,7 +748,8 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
public static List<String> getArgList(String schemaName, String dataTable, String indxTable, String tenantId,
Long startTime, Long endTime,
- boolean shouldAbort, boolean shouldPause, boolean shouldResume, boolean isPartial) {
+ boolean shouldAbort, boolean shouldPause, boolean shouldResume, boolean isPartial,
+ boolean shouldFixUnverified) {
List<String> args = Lists.newArrayList();
if (schemaName != null) {
args.add("--schema=" + schemaName);
@@ -515,6 +789,9 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
if (isPartial) {
args.add("-pt");
}
+ if (shouldFixUnverified) {
+ args.add("-fu");
+ }
return args;
}
@@ -523,7 +800,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT{
String dataTable, String indexTable, String tenantId,
Long startTime, Long endTime) {
List<String> args = getArgList(schemaName, dataTable, indexTable,
- tenantId, startTime, endTime, false, false, false, false);
+ tenantId, startTime, endTime, false, false, false, false, false);
args.add("-op");
args.add("/tmp/" + UUID.randomUUID().toString());
return args.toArray(new String[0]);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexExtendedIT.java
index 5d3b515..13e7e86 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexExtendedIT.java
@@ -217,7 +217,7 @@ public class ImmutableIndexExtendedIT extends ParallelStatsDisabledIT {
TestUtil.waitForIndexState(conn, indexTable, PIndexState.ACTIVE);
}
- private static int getRowCountForEmptyColValue(Connection conn, String tableName,
+ public static int getRowCountForEmptyColValue(Connection conn, String tableName,
byte[] valueBytes) throws IOException, SQLException {
PTable table = PhoenixRuntime.getTable(conn, tableName);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
index 136da65..e7aa5aa 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TransformIT.java
@@ -170,6 +170,37 @@ public class TransformIT extends ParallelStatsDisabledIT {
}
@Test
+ public void testTransformFailsForViewIndex() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
+ conn.setAutoCommit(true);
+ String schema = "S_" + generateUniqueName();
+ String tableName = "TBL_" + generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schema, tableName);
+ String viewName = "VW_" + generateUniqueName();
+ String viewIdxName = "VWIDX_" + generateUniqueName();
+
+ String createTableSql = "CREATE TABLE " + fullTableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, " +
+ "V1 VARCHAR, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) ";
+ conn.createStatement().execute(createTableSql);
+ assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, fullTableName);
+
+ String createViewSql = "CREATE VIEW " + viewName + " ( VIEW_COL1 INTEGER, VIEW_COL2 VARCHAR ) AS SELECT * FROM " + fullTableName;
+ conn.createStatement().execute(createViewSql);
+
+ String createViewIdxSql = "CREATE INDEX " + viewIdxName + " ON " + viewName + " (VIEW_COL1) include (VIEW_COL2) ";
+ conn.createStatement().execute(createViewIdxSql);
+
+ try {
+ conn.createStatement().execute("ALTER INDEX " + viewIdxName + " ON " + viewName
+ + " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
+ fail();
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.CANNOT_TRANSFORM_VIEW_INDEX.getErrorCode(), e.getErrorCode());
+ }
+ }
+ }
+
+ @Test
public void testTransformForLiveMutations_mutatingTable() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) {
conn.setAutoCommit(true);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 9fe6687..f6eb8e1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -567,7 +567,8 @@ public enum SQLExceptionCode {
"Error writing DDL change to external schema registry"),
CANNOT_TRANSFORM_ALREADY_TRANSFORMING_TABLE(910, "43M21",
- "Cannot transform an index or a table who is already going through a transform.");
+ "Cannot transform an index or a table who is already going through a transform."),
+ CANNOT_TRANSFORM_VIEW_INDEX(911, "43M22", "Cannot transform a view index. Consider creating a new view index.");
private final int errorCode;
private final String sqlState;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index a341d2b..3f5462c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -60,7 +60,7 @@ public class PhoenixIndexImportDirectReducer extends
private String indexTableName;
private byte[] indexTableNameBytes;
- private void updateCounters(IndexTool.IndexVerifyType verifyType,
+ protected void updateCounters(IndexTool.IndexVerifyType verifyType,
Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable>.Context context)
throws IOException {
Configuration configuration = context.getConfiguration();
@@ -162,13 +162,15 @@ public class PhoenixIndexImportDirectReducer extends
}
}
- if (PhoenixConfigurationUtil.getIsTransforming(context.getConfiguration())) {
- try {
- Transform.completeTransform(ConnectionUtil
- .getInputConnection(context.getConfiguration()), context.getConfiguration());
- } catch (Exception e) {
- LOGGER.error(" Failed to complete transform", e);
- throw new RuntimeException(e.getMessage());
+ if (verifyType != IndexTool.IndexVerifyType.ONLY) {
+ if (PhoenixConfigurationUtil.getIsTransforming(context.getConfiguration())) {
+ try {
+ Transform.completeTransform(ConnectionUtil
+ .getInputConnection(context.getConfiguration()), context.getConfiguration());
+ } catch (Exception e) {
+ LOGGER.error(" Failed to complete transform", e);
+ throw new RuntimeException(e.getMessage());
+ }
}
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java
index 4a1c4fe..6b5b217 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java
@@ -21,48 +21,54 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.mapreduce.index.IndexToolUtil;
+import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectReducer;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.transform.Transform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
+import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicBoolean;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexVerifyType;
+
/**
* Reducer class that does only one task and that is to complete transform.
*/
public class PhoenixTransformReducer extends
- Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable> {
+ PhoenixIndexImportDirectReducer {
private AtomicBoolean calledOnce = new AtomicBoolean(false);
private static final Logger LOGGER =
LoggerFactory.getLogger(PhoenixTransformReducer.class);
@Override
- protected void setup(Context context) throws IOException {
- }
-
- @Override
protected void reduce(ImmutableBytesWritable arg0, Iterable<IntWritable> arg1,
Context context)
throws IOException, InterruptedException {
if (!calledOnce.compareAndSet(false, true)) {
return;
}
-
- try (final Connection
- connection = ConnectionUtil.getInputConnection(context.getConfiguration())){
- // Complete full Transform and add a partial transform
- Transform.completeTransform(connection, context.getConfiguration());
- } catch (Exception e) {
- LOGGER.error(" Failed to complete transform", e);
- throw new RuntimeException(e.getMessage());
+ IndexTool.IndexVerifyType verifyType = getIndexVerifyType(context.getConfiguration());
+ if (verifyType != IndexTool.IndexVerifyType.NONE) {
+ updateCounters(verifyType, context);
}
- }
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException{
+ if (verifyType != IndexTool.IndexVerifyType.ONLY) {
+ try (final Connection
+ connection = ConnectionUtil.getInputConnection(context.getConfiguration())) {
+ // Complete full Transform and add a partial transform
+ Transform.completeTransform(connection, context.getConfiguration());
+ } catch (Exception e) {
+ LOGGER.error(" Failed to complete transform", e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
}
}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformRepairMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformRepairMapper.java
new file mode 100644
index 0000000..48edd8d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformRepairMapper.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.transform;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.GlobalIndexChecker;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.PhoenixJobCounters;
+import org.apache.phoenix.mapreduce.index.DirectHTableWriter;
+import org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+
+/**
+ * Mapper that hands over rows from data table to the index table.
+ */
+public class PhoenixTransformRepairMapper extends TableMapper<ImmutableBytesWritable, IntWritable> {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(PhoenixTransformRepairMapper.class);
+ private DirectHTableWriter writer;
+ private PhoenixConnection connection;
+ private ImmutableBytesPtr maintainers;
+ private int batchSize;
+ private List<Mutation> mutations ;
+
+ @Override
+ protected void setup(final Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ final Configuration configuration = context.getConfiguration();
+ writer = new DirectHTableWriter(configuration);
+ try {
+ final Properties overrideProps = new Properties();
+ String scn = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+ String txScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
+ if(txScnValue==null && scn!=null) {
+ overrideProps.put(PhoenixRuntime.BUILD_INDEX_AT_ATTRIB, scn);
+ }
+ connection = ConnectionUtil.getOutputConnection(configuration, overrideProps).unwrap(PhoenixConnection.class);
+ maintainers=new ImmutableBytesPtr(PhoenixConfigurationUtil.getIndexMaintainers(configuration));
+ int maxSize =
+ connection.getQueryServices().getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
+ QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+ batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
+ this.mutations = Lists.newArrayListWithExpectedSize(batchSize);
+ LOGGER.info("Mutation Batch Size = " + batchSize);
+ } catch (SQLException e) {
+ tryClosingResources();
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ @Override
+ protected void map(ImmutableBytesWritable row, Result value, Context context)
+ throws IOException, InterruptedException {
+ context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
+ String oldTableName = PhoenixConfigurationUtil.getIndexToolDataTableName(context.getConfiguration());
+ Set<byte[]> extraRowsInNewTable = new HashSet<>();
+ try (Table oldHTable = connection.getQueryServices().getTable(Bytes.toBytes(oldTableName))) {
+ for (Cell cell : value.rawCells()) {
+ Scan buildNewTableScan = new Scan();
+ // The following attributes are set to instruct UngroupedAggregateRegionObserver to do partial rebuild
+ buildNewTableScan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG, TRUE_BYTES);
+ buildNewTableScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, maintainers.get());
+ buildNewTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
+ buildNewTableScan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
+ IndexMaintainer transformMaintainer = TransformMaintainer.deserialize(maintainers.get()).get(0);
+
+ byte[] newRowKey = CellUtil.cloneRow(cell);
+ // Rebuild the new row from the corresponding row in the old data table
+ // To implement rowkey reordering etc, we need to rebuild the rowkey. For now it is the same
+ buildNewTableScan.withStartRow(newRowKey, true);
+ buildNewTableScan.withStopRow(newRowKey, true);
+ buildNewTableScan.setTimeRange(0, cell.getTimestamp()+1);
+ // Pass the index row key to the partial index builder which will rebuild the index row and check if the
+ // row key of this rebuilt index row matches with the passed index row key
+ buildNewTableScan.setAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY, newRowKey);
+ Result result = null;
+ try (ResultScanner resultScanner = oldHTable.getScanner(buildNewTableScan)) {
+ result = resultScanner.next();
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(oldTableName, t);
+ }
+
+ // A single cell will be returned. We decode that here
+ byte[] scanVal = result.value();
+ long code = PLong.INSTANCE.getCodec().decodeLong(new ImmutableBytesWritable(scanVal), SortOrder.getDefault());
+ if (code == GlobalIndexChecker.RebuildReturnCode.NO_DATA_ROW.getValue()) {
+ if (!extraRowsInNewTable.contains(newRowKey)) {
+ extraRowsInNewTable.add(newRowKey);
+ }
+ // This means there does not exist an old table row for this unverified new table row
+ // Delete the unverified row from the new table
+ Delete del = transformMaintainer.buildRowDeleteMutation(newRowKey,
+ IndexMaintainer.DeleteType.ALL_VERSIONS, cell.getTimestamp());
+ mutations.add(del);
+ }
+ // Write Mutation Batch
+ if (context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize == 0) {
+ writeBatch(mutations, context);
+ mutations.clear();
+ }
+ context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT).setValue(extraRowsInNewTable.size());
+ // Make sure progress is reported to Application Master.
+ context.progress();
+ }
+ } catch (SQLException e) {
+ LOGGER.error(" Error {} while read/write of a record ", e.getMessage());
+ context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void writeBatch(List<Mutation> mutations, Context context)
+ throws IOException, SQLException, InterruptedException {
+ writer.write(mutations);
+ context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(mutations.size());
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ try {
+ // Write the last & final Mutation Batch
+ if (!mutations.isEmpty()) {
+ writeBatch(mutations, context);
+ }
+ // We are writing some dummy key-value as map output here so that we commit only one
+ // output to reducer.
+ context.write(new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()),
+ new IntWritable(0));
+ super.cleanup(context);
+ } catch (SQLException e) {
+ LOGGER.error(" Error {} while read/write of a record ", e.getMessage());
+ context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+ throw new RuntimeException(e);
+ } finally {
+ tryClosingResources();
+ }
+ }
+
+ private void tryClosingResources() throws IOException {
+ if (this.connection != null) {
+ try {
+ this.connection.close();
+ } catch (SQLException e) {
+ LOGGER.error("Error while closing connection in the PhoenixIndexMapper class ", e);
+ }
+ }
+ if (this.writer != null) {
+ this.writer.close();
+ }
+ }
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
index f7574d3..40cb74f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.mapreduce.transform;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@@ -28,9 +29,13 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
@@ -42,6 +47,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.phoenix.compile.PostIndexDDLCompiler;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
import org.apache.phoenix.mapreduce.PhoenixServerBuildIndexInputFormat;
@@ -60,6 +67,7 @@ import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.transform.SystemTransformRecord;
import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
@@ -67,10 +75,12 @@ import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.PosixParser;
+import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TransactionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,8 +94,11 @@ import java.util.Arrays;
import java.util.List;
import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
+import static org.apache.phoenix.mapreduce.index.IndexTool.createIndexToolTables;
import static org.apache.phoenix.mapreduce.index.IndexTool.isTimeRangeSet;
import static org.apache.phoenix.mapreduce.index.IndexTool.validateTimeRange;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.setCurrentScnValue;
+import static org.apache.phoenix.query.QueryConstants.UNVERIFIED_BYTES;
import static org.apache.phoenix.util.QueryUtil.getConnection;
public class TransformTool extends Configured implements Tool {
@@ -105,6 +118,13 @@ public class TransformTool extends Configured implements Tool {
private static final Option INDEX_TABLE_OPTION = new Option("it", "index-table", true,
"Index table name(not required in case of partial rebuilding)");
+ private static final Option FIX_UNVERIFIED_TRANSFORM_OPTION = new Option("fu", "fix-unverified", false,
+ "To fix unverified transform records");
+
+ private static final Option USE_NEW_TABLE_AS_SOURCE_OPTION =
+ new Option("fn", "from-new", false,
+ "To verify every row in the new table has a corresponding row in the old table. ");
+
private static final Option PARTIAL_TRANSFORM_OPTION = new Option("pt", "partial-transform", false,
"To transform a data table from a start timestamp");
@@ -145,15 +165,24 @@ public class TransformTool extends Configured implements Tool {
private static final Option END_TIME_OPTION = new Option("et", "end-time",
true, "End time for transform");
- public static final String TRANSFORM_JOB_NAME_TEMPLATE = "PHOENIX_TRANS_%s.%s";
+ public static final String TRANSFORM_JOB_NAME_TEMPLATE = "PHOENIX_TRANS_%s.%s.%s";
public static final String PARTIAL_TRANSFORM_NOT_APPLICABLE = "Partial transform accepts "
+ "non-zero ts set in the past as start-time(st) option and that ts must be present in SYSTEM.TRANSFORM table";
- public static final String TRANSFORM_NOT_APPLICABLE = "Transform is not applicable for local indexes or views or transactional tables";
+ public static final String TRANSFORM_NOT_APPLICABLE = "Transform is not applicable for local indexes or views";
public static final String PARTIAL_TRANSFORM_NOT_COMPATIBLE = "Can't abort/pause/resume/split during partial transform";
+ private static final Option VERIFY_OPTION = new Option("v", "verify", true,
+ "To verify every data row in the old table has a corresponding row in the new table. " +
+ "The accepted values are NONE, ONLY, BEFORE, AFTER, and BOTH. " +
+ "NONE is for no inline verification, which is also the default for this option. ONLY is for " +
+ "verifying without rebuilding the new table rows. The rest for verifying before, after, and both before " +
+ "and after rebuilding row. If the verification is done before rebuilding rows and the correct " +
+ "new table rows will not be rebuilt");
+
+
private Configuration configuration;
private Connection connection;
private String tenantId;
@@ -174,10 +203,13 @@ public class TransformTool extends Configured implements Tool {
private String oldTableWithSchema;
private String newTableWithSchema;
private JobPriority jobPriority;
+ private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;;
private String jobName;
private boolean isForeground;
private Long startTime, endTime, lastTransformTime;
private boolean isPartialTransform;
+ private boolean shouldFixUnverified;
+ private boolean shouldUseNewTableAsSource;
private Job job;
public Long getStartTime() {
@@ -224,10 +256,13 @@ public class TransformTool extends Configured implements Tool {
options.addOption(PARTIAL_TRANSFORM_OPTION);
options.addOption(START_TIME_OPTION);
options.addOption(END_TIME_OPTION);
+ options.addOption(FIX_UNVERIFIED_TRANSFORM_OPTION);
+ options.addOption(USE_NEW_TABLE_AS_SOURCE_OPTION);
options.addOption(AUTO_SPLIT_OPTION);
options.addOption(ABORT_TRANSFORM_OPTION);
options.addOption(PAUSE_TRANSFORM_OPTION);
options.addOption(RESUME_TRANSFORM_OPTION);
+ options.addOption(VERIFY_OPTION);
START_TIME_OPTION.setOptionalArg(true);
END_TIME_OPTION.setOptionalArg(true);
return options;
@@ -265,6 +300,8 @@ public class TransformTool extends Configured implements Tool {
public int populateTransformToolAttributesAndValidate(CommandLine cmdLine) throws Exception {
boolean useStartTime = cmdLine.hasOption(START_TIME_OPTION.getOpt());
boolean useEndTime = cmdLine.hasOption(END_TIME_OPTION.getOpt());
+ shouldFixUnverified = cmdLine.hasOption(FIX_UNVERIFIED_TRANSFORM_OPTION.getOpt());
+ shouldUseNewTableAsSource = cmdLine.hasOption(USE_NEW_TABLE_AS_SOURCE_OPTION.getOpt());
basePath = cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
isPartialTransform = cmdLine.hasOption(PARTIAL_TRANSFORM_OPTION.getOpt());
if (useStartTime) {
@@ -279,11 +316,11 @@ public class TransformTool extends Configured implements Tool {
validateTimeRange(startTime, endTime);
}
- if (isPartialTransform &&
+ if ((isPartialTransform || shouldFixUnverified) &&
(cmdLine.hasOption(AUTO_SPLIT_OPTION.getOpt()))) {
throw new IllegalArgumentException(PARTIAL_TRANSFORM_NOT_COMPATIBLE);
}
- if (isPartialTransform &&
+ if ((isPartialTransform || shouldFixUnverified) &&
(cmdLine.hasOption(ABORT_TRANSFORM_OPTION.getOpt()) || cmdLine.hasOption(PAUSE_TRANSFORM_OPTION.getOpt())
|| cmdLine.hasOption(RESUME_TRANSFORM_OPTION.getOpt()))) {
throw new IllegalArgumentException(PARTIAL_TRANSFORM_NOT_COMPATIBLE);
@@ -337,6 +374,11 @@ public class TransformTool extends Configured implements Tool {
oldTableWithSchema = SchemaUtil.getQualifiedPhoenixTableName(schemaName, SchemaUtil.getTableNameFromFullName(pOldTable.getName().getString()));
newTableWithSchema = SchemaUtil.getQualifiedPhoenixTableName(schemaName, SchemaUtil.getTableNameFromFullName(pNewTable.getName().getString()));
+ if (cmdLine.hasOption(VERIFY_OPTION.getOpt())) {
+ String value = cmdLine.getOptionValue(VERIFY_OPTION.getOpt());
+ verifyType = IndexTool.IndexVerifyType.fromValue(value);
+ }
+
return 0;
}
@@ -350,10 +392,6 @@ public class TransformTool extends Configured implements Tool {
throw new IllegalArgumentException(TRANSFORM_NOT_APPLICABLE);
}
- if (argPDataTable.isTransactional()) {
- throw new IllegalArgumentException(TRANSFORM_NOT_APPLICABLE);
- }
-
if (transformRecord == null){
throw new IllegalStateException("ALTER statement has not been run and the transform has not been created for this table");
}
@@ -425,9 +463,23 @@ public class TransformTool extends Configured implements Tool {
}
public Job configureJob() throws Exception {
- final String jobName = String.format(TRANSFORM_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable);
+ String jobName = String.format(TRANSFORM_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable, (shouldFixUnverified ? "Unverified" : "Full"));
+ if (shouldUseNewTableAsSource) {
+ jobName = String.format(TRANSFORM_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable, "NewTableSource_" + pNewTable.getName());
+ }
+ if (pNewTable.isTransactional()) {
+ configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE,
+ Long.toString(TransactionUtil.convertToNanoseconds(pOldTable.getTimeStamp()+1)));
+ configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, pDataTable.getTransactionProvider().name());
+ }
if (lastTransformTime != null) {
PhoenixConfigurationUtil.setCurrentScnValue(configuration, lastTransformTime);
+ } else {
+ if (endTime != null) {
+ PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime);
+ } else {
+ setCurrentScnValue(configuration, EnvironmentEdgeManager.currentTimeMillis());
+ }
}
final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class);
@@ -446,6 +498,8 @@ public class TransformTool extends Configured implements Tool {
PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
}
+ PhoenixConfigurationUtil.setIndexVerifyType(configuration, verifyType);
+
long indexRebuildQueryTimeoutMs =
configuration.getLong(QueryServices.INDEX_REBUILD_QUERY_TIMEOUT_ATTRIB,
QueryServicesOptions.DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT);
@@ -471,7 +525,12 @@ public class TransformTool extends Configured implements Tool {
PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, oldTableWithSchema);
PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, newTableWithSchema);
- PhoenixConfigurationUtil.setIndexToolSourceTable(configuration, IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE);
+ PhoenixConfigurationUtil.setShouldFixUnverifiedTransform(configuration, shouldFixUnverified);
+ if (shouldFixUnverified || shouldUseNewTableAsSource) {
+ PhoenixConfigurationUtil.setIndexToolSourceTable(configuration, IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE);
+ } else {
+ PhoenixConfigurationUtil.setIndexToolSourceTable(configuration, IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE);
+ }
if (startTime != null) {
PhoenixConfigurationUtil.setIndexToolStartTime(configuration, startTime);
}
@@ -497,16 +556,21 @@ public class TransformTool extends Configured implements Tool {
if (outputPath != null) {
FileOutputFormat.setOutputPath(job, outputPath);
}
- job.setReducerClass(PhoenixTransformReducer.class);
job.setNumReduceTasks(1);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ if (shouldFixUnverified) {
+ configureUnverifiedFromNewToOld();
+ } else {
+ configureFromOldToNew();
+ }
//Set the Output classes
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
TableMapReduceUtil.addDependencyJars(job);
- job.setMapperClass(PhoenixServerBuildIndexMapper.class);
+
+ job.setReducerClass(PhoenixTransformReducer.class);
TableMapReduceUtil.initCredentials(job);
LOGGER.info("TransformTool is running for " + job.getJobName());
@@ -514,6 +578,39 @@ public class TransformTool extends Configured implements Tool {
return job;
}
+ private void configureFromOldToNew() {
+ job.setMapperClass(PhoenixServerBuildIndexMapper.class);
+ }
+
+ private void configureUnverifiedFromNewToOld() throws IOException, SQLException {
+ List<IndexMaintainer> maintainers = Lists.newArrayListWithExpectedSize(1);
+ TransformMaintainer transformMaintainer = pNewTable.getTransformMaintainer(pOldTable, connection.unwrap(PhoenixConnection.class));
+ maintainers.add(transformMaintainer);
+ Scan scan = IndexManagementUtil.newLocalStateScan(maintainers);
+ if (startTime != null) {
+ scan.setTimeRange(startTime - 1, HConstants.LATEST_TIMESTAMP);
+ }
+ scan.setRaw(true);
+ scan.setCacheBlocks(false);
+ SingleColumnValueFilter filter = new SingleColumnValueFilter(
+ transformMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ transformMaintainer.getEmptyKeyValueQualifier(),
+ CompareFilter.CompareOp.EQUAL,
+ UNVERIFIED_BYTES
+ );
+ scan.setFilter(filter);
+ Configuration conf = job.getConfiguration();
+ HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+ // Set the Physical Table name for use in DirectHTableWriter#write(Mutation)
+ conf.set(TableOutputFormat.OUTPUT_TABLE,
+ PhoenixConfigurationUtil.getPhysicalTableName(job.getConfiguration()));
+ ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
+ TransformMaintainer.serialize(pDataTable, indexMetaDataPtr, pNewTable, connection.unwrap(PhoenixConnection.class));
+ PhoenixConfigurationUtil.setIndexMaintainers(conf, indexMetaDataPtr);
+ TableMapReduceUtil.initTableMapperJob(pNewTable.getPhysicalName().getString(), scan, PhoenixTransformRepairMapper.class, null,
+ null, job);
+ }
+
public int runJob() throws IOException {
try {
if (isForeground) {
@@ -731,6 +828,7 @@ public class TransformTool extends Configured implements Tool {
try (Connection conn = getConnection(configuration)) {
this.connection = conn;
this.connection.setAutoCommit(true);
+ createIndexToolTables(conn);
populateTransformToolAttributesAndValidate(cmdLine);
if (cmdLine.hasOption(ABORT_TRANSFORM_OPTION.getOpt())) {
abortTransform();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 9d7df7e..a610a90 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -137,6 +137,11 @@ public final class PhoenixConfigurationUtil {
public static final boolean DEFAULT_SCRUTINY_OUTPUT_INVALID_ROWS = false;
+ public static final String SHOULD_FIX_UNVERIFIED_TRANSFORM =
+ "phoenix.mr.fix.unverified.transform";
+
+ public static final boolean DEFAULT_SHOULD_FIX_UNVERIFIED_TRANSFORM = false;
+
public static final String SCRUTINY_OUTPUT_FORMAT = "phoenix.mr.scrutiny.output.format";
public static final String SCRUTINY_EXECUTE_TIMESTAMP = "phoenix.mr.scrutiny.execute.timestamp";
@@ -853,6 +858,18 @@ public final class PhoenixConfigurationUtil {
return IndexTool.IndexVerifyType.fromValue(value);
}
+ public static boolean getShouldFixUnverifiedTransform(Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ return configuration.getBoolean(SHOULD_FIX_UNVERIFIED_TRANSFORM,
+ DEFAULT_SHOULD_FIX_UNVERIFIED_TRANSFORM);
+ }
+
+ public static void setShouldFixUnverifiedTransform(Configuration configuration,
+ boolean shouldFixUnverified) {
+ Preconditions.checkNotNull(configuration);
+ configuration.setBoolean(SHOULD_FIX_UNVERIFIED_TRANSFORM, shouldFixUnverified);
+ }
+
public static IndexTool.IndexVerifyType getDisableLoggingVerifyType(Configuration configuration) {
Preconditions.checkNotNull(configuration);
String value = configuration.get(DISABLE_LOGGING_TYPE, IndexTool.IndexVerifyType.NONE.getValue());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index c6713df..fb2952a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -4809,6 +4809,10 @@ public class MetaDataClient {
}
if (isTransformNeeded) {
+ if (indexRef.getTable().getViewIndexId() != null) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_TRANSFORM_VIEW_INDEX)
+ .setSchemaName(schemaName).setTableName(indexName).build().buildException();
+ }
try {
Transform.addTransform(connection, tenantId, table, metaProperties, seqNum, PTable.TransformType.METADATA_TRANSFORM);
} catch (SQLException ex) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java
index 8e400b1..cb7dc29 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java
@@ -47,6 +47,7 @@ import java.util.List;
import java.util.ArrayList;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTION_PROVIDER;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY;
import static org.apache.phoenix.util.MetaDataUtil.SYNCED_DATA_TABLE_AND_INDEX_COL_FAM_PROPERTIES;
@@ -389,6 +390,13 @@ public class SchemaExtractionProcessor implements SchemaProcessor {
continue;
}
}
+
+ if (key.contains("TTL") && definedProps.containsKey(TRANSACTION_PROVIDER)
+ && definedProps.get(TRANSACTION_PROVIDER).equalsIgnoreCase("OMID")) {
+ // TTL is unsupported for OMID transactional table
+ continue;
+ }
+
if (optionBuilder.length() != 0) {
optionBuilder.append(", ");
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
index 79e80d3..80d32e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.schema.transform;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -45,6 +46,11 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.RowKeySchema;
import org.apache.phoenix.schema.SaltingUtil;
+
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.ValueSchema;
+import org.apache.phoenix.schema.tuple.BaseTuple;
+import org.apache.phoenix.schema.tuple.ValueGetterTuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -134,7 +140,7 @@ public class TransformMaintainer extends IndexMaintainer {
this.oldTableEncodingScheme = oldTable.getEncodingScheme() == null ? PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : oldTable.getEncodingScheme();
this.oldTableImmutableStorageScheme = oldTable.getImmutableStorageScheme() == null ? PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : oldTable.getImmutableStorageScheme();
- this.newTableName = newTable.getPhysicalName().getBytes();
+ this.newTableName = SchemaUtil.getTableName(newTable.getSchemaName(), newTable.getTableName()).getBytes();
boolean newTableWALDisabled = newTable.isWALDisabled();
int nNewTableColumns = newTable.getColumns().size();
int nNewTablePKColumns = newTable.getPKColumns().size();
@@ -185,6 +191,42 @@ public class TransformMaintainer extends IndexMaintainer {
}
}
+ /*
+ * Build the old table row key
+ */
+ public byte[] buildDataRowKey(ImmutableBytesWritable indexRowKeyPtr, byte[][] viewConstants) {
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ TrustedByteArrayOutputStream stream = new TrustedByteArrayOutputStream(estimatedNewTableRowKeyBytes);
+ DataOutput output = new DataOutputStream(stream);
+
+ try {
+ int dataPosOffset = 0;
+ int maxRowKeyOffset = indexRowKeyPtr.getLength();
+
+ oldTableRowKeySchema.iterator(indexRowKeyPtr, ptr, 0);
+ // The oldTableRowKeySchema includes the salt byte field,
+ while (oldTableRowKeySchema.next(ptr, dataPosOffset, maxRowKeyOffset) != null) {
+ output.write(ptr.get(), ptr.getOffset(), ptr.getLength());
+ if (!oldTableRowKeySchema.getField(dataPosOffset).getDataType().isFixedWidth()) {
+ output.writeByte(SchemaUtil.getSeparatorByte(oldTableRowKeySchema.rowKeyOrderOptimizable(), ptr.getLength()==0
+ , oldTableRowKeySchema.getField(dataPosOffset)));
+ }
+ dataPosOffset++;
+ }
+
+ byte[] oldTableRowKey = stream.getBuffer();
+ return oldTableRowKey;
+ } catch (IOException e) {
+ throw new RuntimeException(e); // Impossible
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e); // Impossible
+ }
+ }
+ }
+
/**
* Init calculated state reading/creating
*/
@@ -232,6 +274,11 @@ public class TransformMaintainer extends IndexMaintainer {
ptr.set(stream.toByteArray(), 0, stream.size());
}
+ @Override
+ public Iterator<ColumnReference> iterator() {
+ return newTableColumns.iterator();
+ }
+
public static ServerCachingProtos.TransformMaintainer toProto(TransformMaintainer maintainer) throws IOException {
ServerCachingProtos.TransformMaintainer.Builder builder = ServerCachingProtos.TransformMaintainer.newBuilder();
builder.setSaltBuckets(maintainer.nNewTableSaltBuckets);
@@ -470,4 +517,11 @@ public class TransformMaintainer extends IndexMaintainer {
return newTableEmptyKeyValueRef.getQualifier();
}
+ public byte[] getDataEmptyKeyValueCF() {
+ return oldTableEmptyKeyValueCF;
+ }
+
+ public byte[] getEmptyKeyValueQualifierForDataTable() {
+ return oldTableEmptyKeyValueRef.getQualifier();
+ }
}
\ No newline at end of file