You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by tk...@apache.org on 2023/07/06 14:44:42 UTC
[phoenix] branch master updated: PHOENIX-6141 : Ensure consistency between SYSTEM.CATALOG and SYSTEM.CHILD_LINK (#1575)
This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 61180892c2 PHOENIX-6141 : Ensure consistency between SYSTEM.CATALOG and SYSTEM.CHILD_LINK (#1575)
61180892c2 is described below
commit 61180892c2f71f5e7eb2956c0e226cd03dda48f1
Author: palash <pa...@gmail.com>
AuthorDate: Thu Jul 6 07:44:36 2023 -0700
PHOENIX-6141 : Ensure consistency between SYSTEM.CATALOG and SYSTEM.CHILD_LINK (#1575)
Implemented a new read-repair based mechanism to ensure consistency between SYSTEM.CATALOG and SYSTEM.CHILD_LINK tables.
---------
Co-authored-by: Palash Chauhan <p....@pchauha-ltm8owy.internal.salesforce.com>
---
.../end2end/ConnectionQueryServicesTestImpl.java | 19 +-
.../phoenix/end2end/OrphanChildLinkRowsIT.java | 270 +++++++++++++++++++++
.../org/apache/phoenix/end2end/ViewMetadataIT.java | 19 +-
.../coprocessor/ChildLinkMetaDataEndpoint.java | 227 ++++++++++++++++-
.../phoenix/coprocessor/ReadRepairScanner.java | 180 ++++++++++++++
.../phoenix/coprocessor/TaskRegionObserver.java | 1 +
.../coprocessor/tasks/ChildLinkScanTask.java | 76 ++++++
.../phoenix/query/ConnectionQueryServicesImpl.java | 114 +++++++--
.../org/apache/phoenix/query/QueryServices.java | 2 +
.../apache/phoenix/query/QueryServicesOptions.java | 2 +
.../java/org/apache/phoenix/schema/PTable.java | 3 +-
.../java/org/apache/phoenix/util/ScanUtil.java | 13 +
12 files changed, 892 insertions(+), 34 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
index e342f50354..0496f86e88 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
@@ -24,11 +24,13 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.query.ConnectionQueryServicesImpl;
@@ -62,7 +64,22 @@ public class ConnectionQueryServicesTestImpl extends ConnectionQueryServicesImpl
// Use Provider.values() instead of Provider.available() here because array accesses will be
// indexed by ordinal.
private final PhoenixTransactionService[] txServices = new PhoenixTransactionService[TransactionFactory.Provider.values().length];
-
+
+ private static boolean failPhaseThreeChildLinkWriteForTesting = false;
+ public static void setFailPhaseThreeChildLinkWriteForTesting(boolean fail) {
+ failPhaseThreeChildLinkWriteForTesting = fail;
+ }
+
+ @Override
+ public void sendChildLinkMutations(List<Mutation> mutations, boolean isVerified, boolean isDelete,
+ byte[] physicalTableNameBytes, byte[] schemaBytes) throws SQLException {
+
+ if ((isDelete || isVerified) && failPhaseThreeChildLinkWriteForTesting) {
+ throw new SQLException("Simulating phase-3 write failure");
+ }
+ super.sendChildLinkMutations(mutations, isVerified, isDelete, physicalTableNameBytes, schemaBytes);
+ }
+
public ConnectionQueryServicesTestImpl(QueryServices services, ConnectionInfo info, Properties props) throws SQLException {
super(services, info, props);
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrphanChildLinkRowsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrphanChildLinkRowsIT.java
new file mode 100644
index 0000000000..9211b75939
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrphanChildLinkRowsIT.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.coprocessor.tasks.ChildLinkScanTask;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME;
+import static org.apache.phoenix.query.QueryConstants.VERIFIED_BYTES;
+
+@Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
+public class OrphanChildLinkRowsIT extends BaseTest {
+
+ private final boolean pagingEnabled;
+ private final String CREATE_TABLE_DDL = "CREATE TABLE %s (TENANT_ID VARCHAR NOT NULL, A INTEGER NOT NULL, B INTEGER CONSTRAINT PK PRIMARY KEY (TENANT_ID, A))";
+ private final String CREATE_VIEW_DDL = "CREATE VIEW %s (NEW_COL1 INTEGER, NEW_COL2 INTEGER) AS SELECT * FROM %s WHERE B > 10";
+
+ public OrphanChildLinkRowsIT(boolean pagingEnabled) {
+ this.pagingEnabled = pagingEnabled;
+ }
+
+ @Parameterized.Parameters(name="OrphanChildLinkRowsIT_pagingEnabled={0}")
+ public static synchronized Collection<Boolean> data() {
+ return Arrays.asList(false, true);
+ }
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ props.put(QueryServices.CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, "0");
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ /**
+ * Disable the child link scan task.
+ * Create tables T1 and T2. Create 3 views on T1.
+ * Create a view (same name as existing view on T1) on T2. This CREATE VIEW will fail.
+ * Verify if there was no orphan child link from T2.
+ *
+ * Instrument CQSI to fail phase three of CREATE VIEW. Create a new view V4 on T2 (passes) and V1 on T2 which will fail.
+ * Both links T2->V4 and T2->V1 will be in UNVERIFIED state, repaired during read.
+ * Check if T2 has only 1 child link.
+ */
+ @Test
+ public void testNoOrphanChildLinkRow() throws Exception {
+
+ ConnectionQueryServicesTestImpl.setFailPhaseThreeChildLinkWriteForTesting(false);
+ ChildLinkScanTask.disableChildLinkScanTask(true);
+
+ String tableName1 = "T_" + generateUniqueName();
+ String tableName2 = "T_" + generateUniqueName();
+ String sameViewName = "V_"+generateUniqueName();
+
+ try (Connection connection = DriverManager.getConnection(getUrl())) {
+ connection.createStatement().execute(String.format(CREATE_TABLE_DDL, tableName1));
+ connection.createStatement().execute(String.format(CREATE_TABLE_DDL, tableName2));
+
+ //create 3 views in the first table
+ connection.createStatement().execute(String.format(CREATE_VIEW_DDL, sameViewName, tableName1));
+ for (int i=0; i<2; i++) {
+ connection.createStatement().execute(String.format(CREATE_VIEW_DDL, "V_"+generateUniqueName(), tableName1));
+ }
+
+ connection.createStatement().execute(String.format(CREATE_VIEW_DDL, sameViewName, tableName2));
+ Assert.fail("Exception should have been thrown when creating a view with same name on a different table.");
+ }
+ catch (TableAlreadyExistsException e) {
+ //expected since we are creating a view with the same name as an existing view
+ }
+
+ verifyNoOrphanChildLinkRow(tableName1, 3);
+ verifyNoOrphanChildLinkRow(tableName2, 0);
+
+ // configure CQSI to fail the last write phase of CREATE VIEW
+ // where child link mutations are set to VERIFIED or are deleted
+ ConnectionQueryServicesTestImpl.setFailPhaseThreeChildLinkWriteForTesting(true);
+
+ try (Connection connection = DriverManager.getConnection(getUrl())) {
+ connection.createStatement().execute(String.format(CREATE_VIEW_DDL, "V_"+generateUniqueName(), tableName2));
+
+ connection.createStatement().execute(String.format(CREATE_VIEW_DDL, sameViewName, tableName2));
+ Assert.fail("Exception should have been thrown when creating a view with same name on a different table.");
+ }
+ catch (TableAlreadyExistsException e) {
+ //expected since we are creating a view with the same name as an existing view
+ }
+ verifyNoOrphanChildLinkRow(tableName1, 3);
+ verifyNoOrphanChildLinkRow(tableName2, 1);
+ }
+
+ /**
+ * Enable child link scan task and configure CQSI to fail the last write phase of CREATE VIEW
+ * Create 2 tables X and Y.
+ * Create a view (same name as existing view on table X) on table Y.
+ * Verify if all rows in HBase table are VERIFIED after Task finishes.
+ */
+ @Test
+ public void testChildLinkScanTaskRepair() throws Exception {
+
+ ConnectionQueryServicesTestImpl.setFailPhaseThreeChildLinkWriteForTesting(true);
+ ChildLinkScanTask.disableChildLinkScanTask(false);
+
+ String tableName1 = "T_" + generateUniqueName();
+ String tableName2 = "T_" + generateUniqueName();
+ String sameViewName = "V_"+generateUniqueName();
+
+ try (Connection connection = DriverManager.getConnection(getUrl())) {
+ connection.createStatement().execute(String.format(CREATE_TABLE_DDL, tableName1));
+ connection.createStatement().execute(String.format(CREATE_TABLE_DDL, tableName2));
+ connection.createStatement().execute(String.format(CREATE_VIEW_DDL, sameViewName, tableName1));
+
+ connection.createStatement().execute(String.format(CREATE_VIEW_DDL, sameViewName, tableName2));
+ Assert.fail("Exception should have been thrown when creating a view with same name on a different table.");
+ }
+ catch (TableAlreadyExistsException e) {
+ //expected since we are creating a view with the same name as an existing view
+ }
+
+ try (Connection connection = DriverManager.getConnection(getUrl())) {
+ // wait for TASKs to complete
+ waitForChildLinkScanTasks();
+
+ // scan the physical table and check there are no UNVERIFIED rows
+ PTable childLinkPTable = PhoenixRuntime.getTable(connection, SYSTEM_CHILD_LINK_NAME);
+ byte[] emptyCF = SchemaUtil.getEmptyColumnFamily(childLinkPTable);
+ byte[] emptyCQ = EncodedColumnsUtil.getEmptyKeyValueInfo(childLinkPTable).getFirst();
+ Scan scan = new Scan();
+ HTable table = (HTable) connection.unwrap(PhoenixConnection.class).getQueryServices().getTable(TableName.valueOf(SYSTEM_CHILD_LINK_NAME).getName());
+ ResultScanner results = table.getScanner(scan);
+ Result result = results.next();
+ while (result != null) {
+ Assert.assertTrue("Found Child Link row with UNVERIFIED status", Arrays.equals(result.getValue(emptyCF, emptyCQ), VERIFIED_BYTES));
+ result = results.next();
+ }
+ }
+ }
+
+ /**
+ * Do multiple times: Create 2 tables and view with same name on both tables.
+ * Check if LIMIT query on SYSTEM.CHILD_LINK returns the right number of rows
+ * Check if only one child link is returned for every table.
+ */
+ @Test
+ public void testChildLinkQueryWithLimit() throws Exception {
+
+ ConnectionQueryServicesTestImpl.setFailPhaseThreeChildLinkWriteForTesting(true);
+ ChildLinkScanTask.disableChildLinkScanTask(true);
+
+ Properties props = new Properties();
+ if (pagingEnabled) {
+ props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, "0");
+ }
+ Map<String, String> expectedChildLinks = new HashMap<>();
+ try (Connection connection = DriverManager.getConnection(getUrl(), props)) {
+ for (int i=0; i<7; i++) {
+ String table1 = "T_" + generateUniqueName();
+ String table2 = "T_" + generateUniqueName();
+ String view = "V_" + generateUniqueName();
+ connection.createStatement().execute(String.format(CREATE_TABLE_DDL, table1));
+ connection.createStatement().execute(String.format(CREATE_TABLE_DDL, table2));
+ connection.createStatement().execute(String.format(CREATE_VIEW_DDL, view, table1));
+ expectedChildLinks.put(table1, view);
+ try {
+ connection.createStatement().execute(String.format(CREATE_VIEW_DDL, view, table2));
+ Assert.fail("Exception should have been thrown when creating a view with same name on a different table.");
+ }
+ catch (TableAlreadyExistsException e) {
+
+ }
+ }
+
+ String childLinkQuery = "SELECT * FROM SYSTEM.CHILD_LINK LIMIT 5";
+ ResultSet rs = connection.createStatement().executeQuery(childLinkQuery);
+ int count = 0;
+ while (rs.next()) {
+ count++;
+ }
+ Assert.assertEquals("Incorrect number of child link rows returned", 5, count);
+ }
+ for (String table : expectedChildLinks.keySet()) {
+ verifyNoOrphanChildLinkRow(table, 1);
+ }
+ }
+
+ private void verifyNoOrphanChildLinkRow(String table, int numExpectedChildLinks) throws Exception {
+ String childLinkQuery = "SELECT * FROM SYSTEM.CHILD_LINK";
+ if (table != null) {
+ childLinkQuery += (" WHERE TABLE_NAME=" + "'" + table + "'");
+ }
+
+ int count = 0;
+ Properties props = new Properties();
+ if (pagingEnabled) {
+ props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, "0");
+ }
+ try (Connection connection = DriverManager.getConnection(getUrl(), props)) {
+ ResultSet rs = connection.createStatement().executeQuery(childLinkQuery);
+ while (rs.next()) {
+ count++;
+ }
+ }
+ Assert.assertTrue("Found Orphan Linking Row", count <= numExpectedChildLinks);
+ Assert.assertTrue("All expected Child Links not returned by query", count >= numExpectedChildLinks);
+ }
+
+ /*
+ Wait for all child link scan tasks to finish.
+ */
+ public static void waitForChildLinkScanTasks() throws Exception {
+ int maxTries = 10, nTries=0;
+ int sleepIntervalMs = 2000;
+ try (Connection connection = DriverManager.getConnection(getUrl())) {
+ do {
+ Thread.sleep(sleepIntervalMs);
+ ResultSet rs = connection.createStatement().executeQuery("SELECT COUNT(*) FROM SYSTEM.TASK WHERE " +
+ PhoenixDatabaseMetaData.TASK_TYPE + " = " + PTable.TaskType.CHILD_LINK_SCAN.getSerializedValue() +
+ " AND " + PhoenixDatabaseMetaData.TASK_STATUS + " != 'COMPLETED'");
+ rs.next();
+ int numPendingTasks = rs.getInt(1);
+ if (numPendingTasks == 0) break;
+ } while(++nTries < maxTries);
+ }
+ }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java
index 7e90118363..219eb8aa83 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java
@@ -691,11 +691,13 @@ public class ViewMetadataIT extends SplitSystemCatalogIT {
String parent2, String viewSchema, String viewName)
throws SQLException {
- final String querySysChildLink =
- "SELECT * FROM SYSTEM.CHILD_LINK WHERE TABLE_SCHEM='%s' AND "
- + "TABLE_NAME='%s' AND COLUMN_FAMILY='%s' AND "
- + LINK_TYPE + " = " +
- PTable.LinkType.CHILD_TABLE.getSerializedValue();
+ /*
+ After PHOENIX-6141, view creation happens in three phases to avoid orphan rows in
+ SYSTEM.CHILD_LINK. We can instrument test CQSI to fail the third phase which will
+ lead to UNVERIFIED orphan rows needed for some tests in this class.
+ */
+ ConnectionQueryServicesTestImpl.setFailPhaseThreeChildLinkWriteForTesting(true);
+
try (Connection conn = DriverManager.getConnection(getUrl());
Statement stmt = conn.createStatement()) {
stmt.execute(String.format(CREATE_BASE_TABLE_DDL, parentSchema,
@@ -711,13 +713,6 @@ public class ViewMetadataIT extends SplitSystemCatalogIT {
} catch (TableAlreadyExistsException ignored) {
// expected
}
-
- // Confirm that the orphan parent->child link exists after the
- // second view creation
- ResultSet rs = stmt.executeQuery(String.format(querySysChildLink,
- parentSchema, parent2, SchemaUtil.getTableName(
- viewSchema, viewName)));
- assertTrue(rs.next());
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ChildLinkMetaDataEndpoint.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ChildLinkMetaDataEndpoint.java
index 90f516c170..46127eb5e6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ChildLinkMetaDataEndpoint.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ChildLinkMetaDataEndpoint.java
@@ -20,12 +20,28 @@ package org.apache.phoenix.coprocessor;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.generated.ChildLinkMetaDataProtos.CreateViewAddChildLinkRequest;
import org.apache.phoenix.coprocessor.generated.ChildLinkMetaDataProtos.ChildLinkMetaDataService;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
@@ -34,24 +50,40 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.protobuf.ProtobufUtil;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ServerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
+import java.util.Optional;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CHECK_VERIFY_COLUMN;
import static org.apache.phoenix.coprocessor.MetaDataEndpointImpl.mutateRowsWithLocks;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE_BYTES;
+import static org.apache.phoenix.query.QueryConstants.VERIFIED_BYTES;
+import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkArgument;
+
/**
* Endpoint co-processor through which Phoenix metadata mutations for SYSTEM.CHILD_LINK flow.
* The {@code parent->child } links ({@link org.apache.phoenix.schema.PTable.LinkType#CHILD_TABLE})
* are stored in the SYSTEM.CHILD_LINK table.
+ *
+ * After PHOENIX-6141, this also serves as an observer coprocessor
+ * that verifies scanned rows of SYSTEM.CHILD_LINK table.
*/
-public class ChildLinkMetaDataEndpoint extends ChildLinkMetaDataService implements RegionCoprocessor {
+public class ChildLinkMetaDataEndpoint extends ChildLinkMetaDataService
+ implements RegionCoprocessor, RegionObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(ChildLinkMetaDataEndpoint.class);
private RegionCoprocessorEnvironment env;
@@ -111,4 +143,197 @@ public class ChildLinkMetaDataEndpoint extends ChildLinkMetaDataService implemen
return phoenixAccessCoprocessorHost;
}
+ /**
+ * Class that verifies a given row of a SYSTEM.CHILD_LINK table.
+ * An instance of this class is created for each scanner on the table
+ * and used to verify individual rows.
+ */
+ public class ChildLinkMetaDataScanner extends ReadRepairScanner {
+
+ private Table sysCatHTable;
+ private Scan childLinkScan;
+ private Scan sysCatViewHeaderRowScan;
+ private Scan sysCatChildParentLinkScan;
+ private final String NULL_DELIMITER = "\0";
+
+ public ChildLinkMetaDataScanner(RegionCoprocessorEnvironment env,
+ Scan scan,
+ RegionScanner scanner) throws IOException {
+ super(env, scan, scanner);
+ sysCatChildParentLinkScan = new Scan();
+ setSysCatViewHeaderRowScan();
+ ageThreshold = env.getConfiguration().getLong(
+ QueryServices.CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS);
+ sysCatHTable = ServerUtil.ConnectionFactory.
+ getConnection(ServerUtil.ConnectionType.DEFAULT_SERVER_CONNECTION, env).
+ getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
+ }
+
+ /*
+ If the row is VERIFIED, remove the empty column from the row
+ */
+ @Override
+ public boolean verifyRow(List<Cell> cellList) {
+ long cellListSize = cellList.size();
+ Cell cell = null;
+ if (cellListSize == 0) {
+ return true;
+ }
+ Iterator<Cell> cellIterator = cellList.iterator();
+ while (cellIterator.hasNext()) {
+ cell = cellIterator.next();
+ if (isEmptyColumn(cell)) {
+ if (Bytes.compareTo(
+ cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+ VERIFIED_BYTES, 0, VERIFIED_BYTES.length) != 0
+ ) {
+ return false;
+ }
+ // Empty column is not supposed to be returned to the client except
+ // when it is the only column included in the scan
+ if (cellListSize > 1) {
+ cellIterator.remove();
+ }
+ return true;
+ }
+ }
+ // no empty column found
+ return false;
+ }
+
+ /*
+ Find parent link in syscat for given child link.
+ If found, mark child link row VERIFIED and start a new scan from it.
+ Otherwise, delete if row is old enough.
+ */
+ @Override
+ public void repairRow(List<Cell> row) throws IOException {
+ Cell cell = row.get(0);
+ byte[] rowKey = CellUtil.cloneRow(cell);
+ long ts = row.get(0).getTimestamp();
+
+ childLinkScan = new Scan(scan);
+
+ boolean isChildParentLinkPresent = isRowPresentInSysCat(sysCatChildParentLinkScan,
+ getChildParentLinkSysCatRowKey(rowKey));
+
+ boolean isViewHeaderRowPresent = false;
+ if (isChildParentLinkPresent) {
+ isViewHeaderRowPresent = isRowPresentInSysCat(sysCatViewHeaderRowScan,
+ getViewHeaderSysCatRowKey(rowKey));
+ }
+ // if row found, repair and verifyRowAndRemoveEmptyColumn
+ if (isChildParentLinkPresent && isViewHeaderRowPresent) {
+ markChildLinkVerified(rowKey, ts, region);
+ scanner.close();
+ childLinkScan.withStartRow(rowKey, true);
+ scanner = region.getScanner(childLinkScan);
+ hasMore = true;
+ }
+ // if not, delete if old enough, otherwise ignore
+ else {
+ deleteIfAgedEnough(rowKey, ts, region);
+ }
+ row.clear();
+ }
+
+ private boolean isRowPresentInSysCat(Scan scan, byte[] rowKey) throws IOException {
+ scan.withStartRow(rowKey, true);
+ scan.withStopRow(rowKey, true);
+ scan.setTimeRange(0, maxTimestamp);
+
+ Result result = null;
+ try (ResultScanner resultScanner = sysCatHTable.getScanner(scan)) {
+ result = resultScanner.next();
+ }
+ catch (Throwable t) {
+ ServerUtil.throwIOException(sysCatHTable.getName().toString(), t);
+ }
+
+ return (result != null) && (!result.isEmpty());
+ }
+
+ /*
+ Construct row key for SYSTEM.CATALOG view header row from a given SYSTEM.CHILD_LINK row key
+ */
+ private byte[] getViewHeaderSysCatRowKey(byte[] childLinkRowKey) {
+ String[] childLinkRowKeyCols = new String(childLinkRowKey, StandardCharsets.UTF_8)
+ .split(NULL_DELIMITER);
+ checkArgument(childLinkRowKeyCols.length == 5);
+ String childTenantId = childLinkRowKeyCols[3];
+ String childFullName = childLinkRowKeyCols[4];
+
+ String childSchema = SchemaUtil.getSchemaNameFromFullName(childFullName);
+ String childTable = SchemaUtil.getTableNameFromFullName(childFullName);
+
+ String[] sysCatRowKeyCols = new String[] {childTenantId, childSchema, childTable};
+ return String.join(NULL_DELIMITER, sysCatRowKeyCols).getBytes(StandardCharsets.UTF_8);
+ }
+
+ /*
+ Construct row key for SYSTEM.CATALOG parent->child link from a given SYSTEM.CHILD_LINK row key
+ SYSTEM.CATALOG -> (CHILD_TENANT_ID, CHILD_SCHEMA, CHILD_TABLE, PARENT_TENANT_ID, PARENT_FULL_NAME)
+ SYSTEM.CHILD_LINK -> (PARENT_TENANT_ID, PARENT_SCHEMA, PARENT_TABLE, CHILD_TENANT_ID, CHILD_FULL_NAME)
+ */
+ private byte[] getChildParentLinkSysCatRowKey(byte[] childLinkRowKey) {
+ String[] childLinkRowKeyCols = new String(childLinkRowKey, StandardCharsets.UTF_8)
+ .split(NULL_DELIMITER);
+ checkArgument(childLinkRowKeyCols.length == 5);
+ String parentTenantId = childLinkRowKeyCols[0];
+ String parentSchema = childLinkRowKeyCols[1];
+ String parentTable = childLinkRowKeyCols[2];
+ String childTenantId = childLinkRowKeyCols[3];
+ String childFullName = childLinkRowKeyCols[4];
+
+ String parentFullName = SchemaUtil.getTableName(parentSchema, parentTable);
+ String childSchema = SchemaUtil.getSchemaNameFromFullName(childFullName);
+ String childTable = SchemaUtil.getTableNameFromFullName(childFullName);
+
+ String[] sysCatRowKeyCols = new String[] {childTenantId, childSchema, childTable,
+ parentTenantId, parentFullName};
+ return String.join(NULL_DELIMITER, sysCatRowKeyCols).getBytes(StandardCharsets.UTF_8);
+ }
+
+
+ private void deleteIfAgedEnough(byte[] rowKey, long ts, Region region) throws IOException {
+ if ((EnvironmentEdgeManager.currentTimeMillis() - ts) > ageThreshold) {
+ Delete del = new Delete(rowKey);
+ Mutation[] mutations = new Mutation[]{del};
+ region.batchMutate(mutations);
+ }
+ }
+
+
+ private void markChildLinkVerified(byte[] rowKey, long ts, Region region) throws IOException {
+ Put put = new Put(rowKey, ts);
+ put.addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
+ Mutation[] mutations = new Mutation[]{put};
+ region.batchMutate(mutations);
+ }
+
+ private void setSysCatViewHeaderRowScan() {
+ sysCatViewHeaderRowScan = new Scan();
+ sysCatViewHeaderRowScan.addColumn(TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES);
+ SingleColumnValueFilter tableTypeFilter = new SingleColumnValueFilter(
+ TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES, CompareOperator.EQUAL,
+ PTableType.VIEW.getSerializedValue().getBytes());
+ sysCatViewHeaderRowScan.setFilter(tableTypeFilter);
+ }
+
+ }
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
+ Scan scan, RegionScanner s) throws IOException {
+ if (scan.getAttribute(CHECK_VERIFY_COLUMN) == null) {
+ return s;
+ }
+ return new ChildLinkMetaDataScanner(c.getEnvironment(), scan, new PagingRegionScanner(c.getEnvironment().getRegion(), s, scan));
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ReadRepairScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ReadRepairScanner.java
new file mode 100644
index 0000000000..ff22ae271a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ReadRepairScanner.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
+
+public abstract class ReadRepairScanner extends BaseRegionScanner {
+
+ public Logger LOGGER;
+ public RegionScanner scanner;
+ public Scan scan;
+ public RegionCoprocessorEnvironment env;
+ public byte[] emptyCF;
+ public byte[] emptyCQ;
+ public Region region;
+ public boolean hasMore;
+ public long pageSizeMs;
+ public long pageSize = Long.MAX_VALUE;
+ public long rowCount = 0;
+ public long maxTimestamp;
+ public long ageThreshold;
+ public boolean initialized = false;
+
+ public ReadRepairScanner(RegionCoprocessorEnvironment env, Scan scan, RegionScanner scanner) {
+ super(scanner);
+ LOGGER = LoggerFactory.getLogger(this.getClass());
+ this.env = env;
+ this.scan = scan;
+ this.scanner = scanner;
+ region = env.getRegion();
+ emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME);
+ emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME);
+ pageSizeMs = getPageSizeMsForRegionScanner(scan);
+ maxTimestamp = scan.getTimeRange().getMax();
+ }
+
+ private void init() throws IOException {
+ if (!initialized) {
+ PageFilter pageFilter = ScanUtil.removePageFilter(scan);
+ if (pageFilter != null) {
+ pageSize = pageFilter.getPageSize();
+ scanner.close();
+ scanner = region.getScanner(scan);
+ }
+ initialized = true;
+ }
+ }
+
+ /*
+ Method which checks whether a row is VERIFIED (i.e. does not need repair).
+ */
+ abstract boolean verifyRow(List<Cell> row);
+
+ /*
+ Method which repairs the given row
+ */
+ abstract void repairRow(List<Cell> row) throws IOException;
+
+ public boolean next(List<Cell> result, boolean raw) throws IOException {
+ try {
+ init();
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ do {
+ if (raw) {
+ hasMore = scanner.nextRaw(result);
+ } else {
+ hasMore = scanner.next(result);
+ }
+ if (result.isEmpty()) {
+ return hasMore;
+ }
+ if (isDummy(result)) {
+ return true;
+ }
+ Cell cell = result.get(0);
+ if (verifyRowAndRepairIfNecessary(result)) {
+ break;
+ }
+ if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) >= pageSizeMs) {
+ byte[] rowKey = CellUtil.cloneRow(cell);
+ result.clear();
+ getDummyResult(rowKey, result);
+ return true;
+ }
+ // skip this row as it is invalid
+ // if there is no more row, then result will be an empty list
+ } while (hasMore);
+ rowCount++;
+ if (rowCount == pageSize) {
+ return false;
+ }
+ return hasMore;
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+
+ @Override
+ public boolean next(List<Cell> result) throws IOException {
+ return next(result, false);
+ }
+
+ @Override
+ public boolean nextRaw(List<Cell> result) throws IOException {
+ return next(result, true);
+ }
+
+ /**
+ * Helper method to verifies and repairs the row if necessary.
+ * @param cellList
+ * @return true if the row is already VERIFIED, false if the row needed repair
+ * @throws IOException
+ */
+ private boolean verifyRowAndRepairIfNecessary(List<Cell> cellList) throws IOException {
+ // check if row is VERIFIED
+ if (verifyRow(cellList)) {
+ return true;
+ }
+ else {
+ try {
+ repairRow(cellList);
+ } catch (IOException e) {
+ LOGGER.warn("Row Repair failure on region {}.",
+ env.getRegionInfo().getRegionNameAsString());
+ throw e;
+ }
+
+ if (cellList.isEmpty()) {
+ return false;
+ }
+ return true;
+ }
+ }
+
+ public boolean isEmptyColumn(Cell cell) {
+ return Bytes.compareTo(
+ cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+ emptyCF, 0, emptyCF.length) == 0 &&
+ Bytes.compareTo(
+ cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+ emptyCQ, 0, emptyCQ.length) == 0;
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
index f5ae114faf..7431d12418 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
@@ -73,6 +73,7 @@ public class TaskRegionObserver implements RegionObserver, RegionCoprocessor {
.put(TaskType.DROP_CHILD_VIEWS, "org.apache.phoenix.coprocessor.tasks.DropChildViewsTask")
.put(TaskType.INDEX_REBUILD, "org.apache.phoenix.coprocessor.tasks.IndexRebuildTask")
.put(TaskType.TRANSFORM_MONITOR, "org.apache.phoenix.coprocessor.tasks.TransformMonitorTask")
+ .put(TaskType.CHILD_LINK_SCAN, "org.apache.phoenix.coprocessor.tasks.ChildLinkScanTask")
.build();
public enum TaskResultCode {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/ChildLinkScanTask.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/ChildLinkScanTask.java
new file mode 100644
index 0000000000..f41200ad31
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/ChildLinkScanTask.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor.tasks;
+
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSet;
+
+/*
+Task to run a simple select * query on SYSTEM.CHILD_LINK table
+to trigger read repair and verify any unverified rows.
+ */
+public class ChildLinkScanTask extends BaseTask {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ChildLinkScanTask.class);
+ private static final String CHILD_LINK_QUERY = "SELECT COUNT(*) FROM SYSTEM.CHILD_LINK";
+ private static boolean isDisabled = false;
+
+ @VisibleForTesting
+ public static void disableChildLinkScanTask(boolean disable) {
+ isDisabled = disable;
+ }
+
+ @Override
+ public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) {
+
+ if (isDisabled) {
+ return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
+ "ChildLinkScan task is disabled.");
+ }
+
+ int count = 0;
+ try {
+ PhoenixConnection pconn = QueryUtil.getConnectionOnServer(
+ env.getConfiguration()).unwrap(PhoenixConnection.class);
+ ResultSet rs = pconn.createStatement().executeQuery(CHILD_LINK_QUERY);
+ if (rs.next()) {
+ count = rs.getInt(1);
+ }
+ }
+ catch (Exception e) {
+ LOGGER.error("Exception in Child Link Scan Task: " + e);
+ return new TaskRegionObserver.TaskResult(
+ TaskRegionObserver.TaskResultCode.FAIL, e.getMessage());
+ }
+ return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS,
+ "Number of rows in SYSTEM.CHILD_LINK: " + count);
+ }
+
+ @Override
+ public TaskRegionObserver.TaskResult checkCurrentResult(Task.TaskRecord taskRecord)
+ throws Exception {
+ return null;
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index c5f5b8fd52..752c46a76e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -49,6 +49,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAM
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_TABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_HBASE_TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
@@ -74,6 +75,8 @@ import static org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_FAIL
import static org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_SUCCESS;
import static org.apache.phoenix.monitoring.MetricType.TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS;
import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.VERIFIED_BYTES;
+import static org.apache.phoenix.query.QueryConstants.UNVERIFIED_BYTES;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ENABLED;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE;
@@ -95,6 +98,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
+import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
@@ -274,6 +278,8 @@ import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableProperty;
import org.apache.phoenix.schema.stats.GuidePostsInfo;
import org.apache.phoenix.schema.stats.GuidePostsKey;
+import org.apache.phoenix.schema.task.SystemTaskParams;
+import org.apache.phoenix.schema.task.Task;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PInteger;
@@ -291,7 +297,9 @@ import org.apache.phoenix.transaction.TransactionFactory.Provider;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.ConfigUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.JDBCUtil;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.MetaDataUtil;
@@ -304,7 +312,6 @@ import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.StringUtil;
-import org.apache.phoenix.util.TimeKeeper;
import org.apache.phoenix.util.UpgradeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -394,6 +401,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private ServerSideRPCControllerFactory serverSideRPCControllerFactory;
private boolean localIndexUpgradeRequired;
+
private static interface FeatureSupported {
boolean isSupported(ConnectionQueryServices services);
}
@@ -2158,29 +2166,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
// Avoid the client-server RPC if this is not a view creation
+ // For view creation, write UNVERIFIED child_link rows
if (!childLinkMutations.isEmpty()) {
- // Send mutations for parent-child links to SYSTEM.CHILD_LINK
- // We invoke this using rowKey available in the first element
- // of childLinkMutations.
- final byte[] rowKey = childLinkMutations.get(0).getRow();
- final RpcController controller = getController(PhoenixDatabaseMetaData.SYSTEM_LINK_HBASE_TABLE_NAME);
- final MetaDataMutationResult result =
- childLinkMetaDataCoprocessorExec(rowKey,
- new ChildLinkMetaDataServiceCallBack(controller, childLinkMutations));
-
- switch (result.getMutationCode()) {
- case UNABLE_TO_CREATE_CHILD_LINK:
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.UNABLE_TO_CREATE_CHILD_LINK)
- .setSchemaName(Bytes.toString(schemaBytes))
- .setTableName(Bytes.toString(physicalTableNameBytes)).build().buildException();
- default:
- break;
- }
+ sendChildLinkMutations(childLinkMutations, false, false, physicalTableNameBytes, schemaBytes);
}
// Send the remaining metadata mutations to SYSTEM.CATALOG
byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
- return metaDataCoprocessorExec(SchemaUtil.getPhysicalHBaseTableName(schemaBytes, tableBytes,
+ MetaDataMutationResult result = metaDataCoprocessorExec(SchemaUtil.getPhysicalHBaseTableName(schemaBytes, tableBytes,
SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, this.props)).toString(),
tableKey,
new Batch.Call<MetaDataService, MetaDataResponse>() {
@@ -2207,6 +2200,89 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return rpcCallback.get();
}
});
+
+ //For view creation, if SYSCAT rpc succeeds, mark child_link rows as VERIFIED
+ if (!childLinkMutations.isEmpty()) {
+ try {
+ if (result.getMutationCode() == MutationCode.TABLE_NOT_FOUND) {
+ sendChildLinkMutations(childLinkMutations, true, false, physicalTableNameBytes, schemaBytes);
+ } else {
+ sendChildLinkMutations(childLinkMutations, true, true, physicalTableNameBytes, schemaBytes);
+ }
+ }
+ catch (SQLException e) {
+ //unverified rows will be repaired during read
+ LOGGER.debug("Exception in phase-3 of view creation: " + e.getMessage());
+ addChildLinkScanTask();
+ }
+ }
+ return result;
+ }
+
+ /*
+ Helper method to send mutations to SYSTEM.CHILD_LINK using its endpoint coprocessor
+ */
+ public void sendChildLinkMutations(List<Mutation> mutations, boolean isVerified, boolean isDelete,
+ byte[] physicalTableNameBytes, byte[] schemaBytes)
+ throws SQLException {
+
+ // get empty column information
+ PTable childLinkLogicalTable = getTable(null, PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME, HConstants.LATEST_TIMESTAMP);
+ byte[] emptyCF = SchemaUtil.getEmptyColumnFamily(childLinkLogicalTable);
+ byte[] emptyCQ = EncodedColumnsUtil.getEmptyKeyValueInfo(childLinkLogicalTable).getFirst();
+
+ // add empty column value to mutations or create delete mutations for phase-3
+ List<Mutation> childLinkMutations = new ArrayList<>();
+ for (Mutation m : mutations) {
+ if (isDelete) {
+ Delete delete = new Delete(m.getRow());
+ childLinkMutations.add(delete);
+ }
+ else {
+ Put put = isVerified ? new Put(m.getRow()) : (Put)m;
+ byte[] emptyColumnValue = isVerified ? VERIFIED_BYTES : UNVERIFIED_BYTES;
+ put.addColumn(emptyCF, emptyCQ, emptyColumnValue);
+ childLinkMutations.add(put);
+ }
+ }
+
+ // Send mutations for parent-child links to SYSTEM.CHILD_LINK
+ // We invoke this using rowKey available in the first element
+ // of childLinkMutations.
+ final byte[] rowKey = childLinkMutations.get(0).getRow();
+ final RpcController controller = getController(PhoenixDatabaseMetaData.SYSTEM_LINK_HBASE_TABLE_NAME);
+ final MetaDataMutationResult result =
+ childLinkMetaDataCoprocessorExec(rowKey,
+ new ChildLinkMetaDataServiceCallBack(controller, childLinkMutations));
+
+ switch (result.getMutationCode()) {
+ case UNABLE_TO_CREATE_CHILD_LINK:
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.UNABLE_TO_CREATE_CHILD_LINK)
+ .setSchemaName(Bytes.toString(schemaBytes))
+ .setTableName(Bytes.toString(physicalTableNameBytes)).build().buildException();
+ default:
+ break;
+ }
+ }
+
+ /*
+ Add a task to SYSTEM.TASK table which does a simple select * query on SYSTEM.CHILD_LINK table.
+ This should trigger the read repair step and verify any unverified rows.
+ */
+ private void addChildLinkScanTask() {
+
+ try {
+ PhoenixConnection conn = QueryUtil.getConnection(config).unwrap(PhoenixConnection.class);
+ Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+ .setConn(conn)
+ .setTaskType(PTable.TaskType.CHILD_LINK_SCAN)
+ .setTaskStatus(PTable.TaskStatus.CREATED.toString())
+ .setStartTs(new Timestamp(EnvironmentEdgeManager.currentTimeMillis()))
+ .setTableName(SYSTEM_CHILD_LINK_TABLE)
+ .build());
+ } catch (Throwable t) {
+ LOGGER.error("Adding a task to scan SYSTEM.CHILD_LINK failed!", t);
+ }
}
@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index e4885a7910..53252941a9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -350,6 +350,8 @@ public interface QueryServices extends SQLCloseable {
public static final String PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED = "phoenix.ttl.server_side.masking.enabled";
// The time limit on the amount of work to be done in one RPC call
public static final String PHOENIX_SERVER_PAGE_SIZE_MS = "phoenix.server.page.size.ms";
+ // The minimum age of an unverified child link row to be eligible for deletion
+ public static final String CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB = "phoenix.child.link.row.age.threshold.to.delete.ms";
// Phoenix TTL implemented by CompactionScanner and TTLRegionScanner is enabled
public static final String PHOENIX_TABLE_TTL_ENABLED = "phoenix.table.ttl.enabled";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index adb3b55e2f..78305e50cc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -403,6 +403,8 @@ public class QueryServicesOptions {
public static final int DEFAULT_MAX_REGION_LOCATIONS_SIZE_EXPLAIN_PLAN = 5;
+ public static final long DEFAULT_CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS = 1 * 60 * 60 * 1000; // 1 hour
+
private final Configuration config;
private QueryServicesOptions(Configuration config) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 1798d444b6..6e44586363 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -204,7 +204,8 @@ public interface PTable extends PMetaDataEntity {
public enum TaskType {
DROP_CHILD_VIEWS((byte)1),
INDEX_REBUILD((byte)2),
- TRANSFORM_MONITOR((byte)3);
+ TRANSFORM_MONITOR((byte)3),
+ CHILD_LINK_SCAN((byte)4);
private final byte[] byteValue;
private final byte serializedValue;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 7f2690ed77..3928142f95 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -79,6 +79,7 @@ import org.apache.phoenix.hbase.index.util.VersionUtil;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.KeyRange.Bound;
import org.apache.phoenix.query.QueryConstants;
@@ -1302,10 +1303,22 @@ public class ScanUtil {
}
}
+ private static void setScanAtrributesForChildLinkRepair(Scan scan, PTable table, PhoenixConnection phoenixConnection) {
+ if (table.getName().getString().equals(PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME)) {
+ scan.setAttribute(BaseScannerRegionObserver.CHECK_VERIFY_COLUMN, TRUE_BYTES);
+ byte[] emptyCF = SchemaUtil.getEmptyColumnFamily(table);
+ byte[] emptyCQ = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
+ scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyCF);
+ scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyCQ);
+ addEmptyColumnToScan(scan, emptyCF, emptyCQ);
+ }
+ }
+
public static void setScanAttributesForClient(Scan scan, PTable table,
PhoenixConnection phoenixConnection) throws SQLException {
setScanAttributesForIndexReadRepair(scan, table, phoenixConnection);
setScanAttributesForPhoenixTTL(scan, table, phoenixConnection);
+ setScanAtrributesForChildLinkRepair(scan, table, phoenixConnection);
byte[] emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME);
byte[] emptyCQ = scan.getAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME);
if (emptyCF != null && emptyCQ != null) {