You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by pb...@apache.org on 2018/11/26 10:39:34 UTC
[05/12] phoenix git commit: PHOENIX-4764 Cleanup metadata of child
views for a base table that has been dropped
PHOENIX-4764 Cleanup metadata of child views for a base table that has been dropped
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4bbe8e20
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4bbe8e20
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4bbe8e20
Branch: refs/heads/4.x-cdh5.15
Commit: 4bbe8e20960e615c0136aa130f1e41587d7e742f
Parents: b82843b
Author: Kadir <ko...@salesforce.com>
Authored: Thu Sep 27 07:32:31 2018 +0100
Committer: pboado <pe...@gmail.com>
Committed: Sun Nov 25 22:09:06 2018 +0000
----------------------------------------------------------------------
.../phoenix/end2end/BasePermissionsIT.java | 4 +-
.../phoenix/end2end/DropTableWithViewsIT.java | 151 ++++++++++
.../end2end/QueryDatabaseMetaDataIT.java | 4 +
.../end2end/TenantSpecificTablesDDLIT.java | 4 +-
.../coprocessor/MetaDataEndpointImpl.java | 46 ++-
.../phoenix/coprocessor/TaskRegionObserver.java | 292 +++++++++++++++++++
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 9 +-
.../query/ConnectionQueryServicesImpl.java | 20 +-
.../query/ConnectionlessQueryServicesImpl.java | 9 +
.../apache/phoenix/query/QueryConstants.java | 17 +-
.../org/apache/phoenix/query/QueryServices.java | 6 +
.../phoenix/query/QueryServicesOptions.java | 4 +
.../java/org/apache/phoenix/schema/PTable.java | 31 +-
.../phoenix/schema/stats/StatisticsUtil.java | 2 +
.../org/apache/phoenix/util/SchemaUtil.java | 10 +
.../java/org/apache/phoenix/query/BaseTest.java | 1 +
16 files changed, 589 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bbe8e20/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
index 88a942e..932ce9f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
@@ -428,7 +428,7 @@ public class BasePermissionsIT extends BaseTest {
@Override
public Object run() throws Exception {
try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
- assertFalse(stmt.execute("DROP TABLE IF EXISTS " + tableName));
+ assertFalse(stmt.execute(String.format("DROP TABLE IF EXISTS %s CASCADE", tableName)));
}
return null;
}
@@ -653,7 +653,7 @@ public class BasePermissionsIT extends BaseTest {
@Override
public Object run() throws Exception {
try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
- assertFalse(stmt.execute("DROP VIEW " + viewName));
+ assertFalse(stmt.execute(String.format("DROP VIEW %s CASCADE", viewName)));
}
return null;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bbe8e20/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java
new file mode 100644
index 0000000..9502218
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.phoenix.coprocessor.TableViewFinderResult;
+import org.apache.phoenix.coprocessor.ViewFinder;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class DropTableWithViewsIT extends SplitSystemCatalogIT {
+
+ private final boolean isMultiTenant;
+ private final boolean columnEncoded;
+ private final String TENANT_SPECIFIC_URL1 = getUrl() + ';' + TENANT_ID_ATTRIB + "=" + TENANT1;
+
+ public DropTableWithViewsIT(boolean isMultiTenant, boolean columnEncoded) {
+ this.isMultiTenant = isMultiTenant;
+ this.columnEncoded = columnEncoded;
+ }
+
+ @Parameters(name="DropTableWithViewsIT_multiTenant={0}, columnEncoded={1}") // name is used by failsafe as file name in reports
+ public static Collection<Boolean[]> data() {
+ return Arrays.asList(new Boolean[][] {
+ { false, false }, { false, true },
+ { true, false }, { true, true } });
+ }
+
+ private String generateDDL(String format) {
+ return generateDDL("", format);
+ }
+
+ private String generateDDL(String options, String format) {
+ StringBuilder optionsBuilder = new StringBuilder(options);
+ if (!columnEncoded) {
+ if (optionsBuilder.length() != 0)
+ optionsBuilder.append(",");
+ optionsBuilder.append("COLUMN_ENCODED_BYTES=0");
+ }
+ if (isMultiTenant) {
+ if (optionsBuilder.length() !=0 )
+ optionsBuilder.append(",");
+ optionsBuilder.append("MULTI_TENANT=true");
+ }
+ return String.format(format, isMultiTenant ? "TENANT_ID VARCHAR NOT NULL, " : "",
+ isMultiTenant ? "TENANT_ID, " : "", optionsBuilder.toString());
+ }
+
+ @Test
+ public void testDropTableWithChildViews() throws Exception {
+ String baseTable = SchemaUtil.getTableName(SCHEMA1, generateUniqueName());
+ try (Connection conn = DriverManager.getConnection(getUrl());
+ Connection viewConn =
+ isMultiTenant ? DriverManager.getConnection(TENANT_SPECIFIC_URL1) : conn) {
+ String ddlFormat =
+ "CREATE TABLE IF NOT EXISTS " + baseTable + " ("
+ + " %s PK2 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR "
+ + " CONSTRAINT NAME_PK PRIMARY KEY (%s PK2)" + " ) %s";
+ conn.createStatement().execute(generateDDL(ddlFormat));
+ conn.commit();
+ // Create a view tree (i.e., tree of views) with depth of 2 and fanout factor of 4
+ for (int i = 0; i < 4; i++) {
+ String childView = SchemaUtil.getTableName(SCHEMA2, generateUniqueName());
+ String childViewDDL = "CREATE VIEW " + childView + " AS SELECT * FROM " + baseTable;
+ viewConn.createStatement().execute(childViewDDL);
+ for (int j = 0; j < 4; j++) {
+ String grandChildView = SchemaUtil.getTableName(SCHEMA2, generateUniqueName());
+ String grandChildViewDDL = "CREATE VIEW " + grandChildView + " AS SELECT * FROM " + childView;
+ viewConn.createStatement().execute(grandChildViewDDL);
+ }
+ }
+ // Drop the base table
+ String dropTable = String.format("DROP TABLE IF EXISTS %s CASCADE", baseTable);
+ conn.createStatement().execute(dropTable);
+
+ // Wait for the tasks for dropping child views to complete. The depth of the view tree is 2, so we expect that
+ // this will be done in two task handling runs, i.e., in tree task handling interval at most in general
+ // by assuming that each non-root level will be processed in one interval. To be on the safe side, we will
+ // wait at most 10 intervals.
+ long halfTimeInterval = config.getLong(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_TASK_HANDLING_INTERVAL_MS)/2;
+ ResultSet rs = null;
+ boolean timedOut = true;
+ Thread.sleep(3 * halfTimeInterval);
+ for (int i = 3; i < 20; i++) {
+ rs = conn.createStatement().executeQuery("SELECT * " +
+ " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
+ " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = " +
+ PTable.TaskType.DROP_CHILD_VIEWS.getSerializedValue());
+ Thread.sleep(halfTimeInterval);
+ if (!rs.next()) {
+ timedOut = false;
+ break;
+ }
+ }
+ if (timedOut) {
+ fail("Drop child view task execution timed out!");
+ }
+ // Views should be dropped by now
+ TableName linkTable = TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES);
+ TableViewFinderResult childViewsResult = new TableViewFinderResult();
+ ViewFinder.findAllRelatives(getUtility().getConnection().getTable(linkTable),
+ HConstants.EMPTY_BYTE_ARRAY,
+ SchemaUtil.getSchemaNameFromFullName(baseTable).getBytes(),
+ SchemaUtil.getTableNameFromFullName(baseTable).getBytes(),
+ PTable.LinkType.CHILD_TABLE,
+ childViewsResult);
+ assertTrue(childViewsResult.getLinks().size() == 0);
+ // There should not be any orphan views
+ rs = conn.createStatement().executeQuery("SELECT * FROM " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME +
+ " WHERE " + PhoenixDatabaseMetaData.TABLE_SCHEM + " = '" + SCHEMA2 +"'");
+ assertFalse(rs.next());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bbe8e20/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
index 76d8e19..2aa6db8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
@@ -186,6 +186,10 @@ public class QueryDatabaseMetaDataIT extends ParallelStatsDisabledIT {
assertEquals(PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE, rs.getString("TABLE_NAME"));
assertEquals(PTableType.SYSTEM.toString(), rs.getString("TABLE_TYPE"));
assertTrue(rs.next());
+ assertEquals(SYSTEM_CATALOG_SCHEMA, rs.getString("TABLE_SCHEM"));
+ assertEquals(PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE, rs.getString("TABLE_NAME"));
+ assertEquals(PTableType.SYSTEM.toString(), rs.getString("TABLE_TYPE"));
+ assertTrue(rs.next());
assertEquals(null, rs.getString("TABLE_SCHEM"));
assertEquals(tableAName, rs.getString("TABLE_NAME"));
assertEquals(PTableType.TABLE.toString(), rs.getString("TABLE_TYPE"));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bbe8e20/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
index 956b43c..85c9128 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
@@ -503,8 +503,10 @@ public class TenantSpecificTablesDDLIT extends BaseTenantSpecificTablesIT {
assertTableMetaData(rs, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.TYPE_SEQUENCE, PTableType.SYSTEM);
assertTrue(rs.next());
assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE, PTableType.SYSTEM);
+ assertTrue(rs.next());
+ assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE, PTableType.SYSTEM);
assertFalse(rs.next());
-
+
rs = meta.getTables(null, "", StringUtil.escapeLike(tenantTable2), new String[] {TABLE.getValue().getString()});
assertFalse(rs.next());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bbe8e20/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index aa78b1b..424b6d6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -86,6 +86,7 @@ import java.security.PrivilegedExceptionAction;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -1936,7 +1937,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
table = loadTable(env, tableKey, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP,
clientVersion);
} catch (ParentTableNotFoundException e) {
- dropChildMetadata(e.getParentSchemaName(), e.getParentTableName(), e.getParentTenantId());
+ dropChildViews(env, e.getParentTenantId(), e.getParentSchemaName(), e.getParentTableName());
}
if (table != null) {
if (table.getTimeStamp() < clientTimeStamp) {
@@ -1961,7 +1962,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
// check if the table was dropped, but had child views that were have not yet
// been cleaned up by compaction
if (!Bytes.toString(schemaName).equals(QueryConstants.SYSTEM_SCHEMA_NAME)) {
- dropChildMetadata(schemaName, tableName, tenantIdBytes);
+ dropChildViews(env, tenantIdBytes, schemaName, tableName);
}
byte[] parentTableKey = null;
@@ -2328,19 +2329,31 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
}
- private void dropChildMetadata(byte[] schemaName, byte[] tableName, byte[] tenantIdBytes)
+ public static void dropChildViews(RegionCoprocessorEnvironment env, byte[] tenantIdBytes, byte[] schemaName, byte[] tableName)
throws IOException, SQLException, ClassNotFoundException {
- TableViewFinderResult childViewsResult = new TableViewFinderResult();
- findAllChildViews(tenantIdBytes, schemaName, tableName, childViewsResult);
+ Table hTable =
+ ServerUtil.getHTableForCoprocessorScan(env,
+ SchemaUtil.getPhysicalTableName(
+ PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES,
+ env.getConfiguration()).getName());
+ TableViewFinderResult childViewsResult = ViewFinder.findRelatedViews(hTable, tenantIdBytes, schemaName, tableName,
+ PTable.LinkType.CHILD_TABLE, HConstants.LATEST_TIMESTAMP);
+
if (childViewsResult.hasLinks()) {
+
for (TableInfo viewInfo : childViewsResult.getLinks()) {
byte[] viewTenantId = viewInfo.getTenantId();
byte[] viewSchemaName = viewInfo.getSchemaName();
byte[] viewName = viewInfo.getTableName();
+ if (logger.isDebugEnabled()) {
+ logger.debug("dropChildViews :" + Bytes.toString(schemaName) + "." + Bytes.toString(tableName) +
+ " -> " + Bytes.toString(viewSchemaName) + "." + Bytes.toString(viewName) +
+ "with tenant id :" + Bytes.toString(viewTenantId));
+ }
Properties props = new Properties();
if (viewTenantId != null && viewTenantId.length != 0)
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, Bytes.toString(viewTenantId));
- try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(env.getConfiguration())
+ try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(props, env.getConfiguration())
.unwrap(PhoenixConnection.class)) {
MetaDataClient client = new MetaDataClient(connection);
org.apache.phoenix.parse.TableName viewTableName = org.apache.phoenix.parse.TableName
@@ -2351,7 +2364,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
}
}
-
private boolean execeededIndexQuota(PTableType tableType, PTable parentTable) {
return PTableType.INDEX == tableType && parentTable.getIndexes().size() >= maxIndexesPerTable;
}
@@ -2514,6 +2526,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
throw new IllegalStateException(msg);
}
+
// drop rows from catalog on this region
mutateRowsWithLocks(region, localMutations, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
HConstants.NO_NONCE);
@@ -2631,7 +2644,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
EnvironmentEdgeManager.currentTimeMillis(), null);
}
- if (tableType == PTableType.TABLE || tableType == PTableType.SYSTEM) {
+ if (tableType == PTableType.TABLE || tableType == PTableType.VIEW || tableType == PTableType.SYSTEM) {
// check to see if the table has any child views
try (Table hTable =
env.getTable(SchemaUtil.getPhysicalTableName(
@@ -2640,10 +2653,19 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
boolean hasChildViews =
ViewFinder.hasChildViews(hTable, tenantId, schemaName, tableName,
clientTimeStamp);
- if (hasChildViews && !isCascade) {
- // DROP without CASCADE on tables with child views is not permitted
- return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
- EnvironmentEdgeManager.currentTimeMillis(), null);
+ if (hasChildViews) {
+ if (!isCascade) {
+ // DROP without CASCADE on tables with child views is not permitted
+ return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
+ EnvironmentEdgeManager.currentTimeMillis(), null);
+ }
+ try {
+ PhoenixConnection conn = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
+ TaskRegionObserver.addTask(conn, PTable.TaskType.DROP_CHILD_VIEWS, Bytes.toString(tenantId),
+ Bytes.toString(schemaName), Bytes.toString(tableName), this.accessCheckEnabled);
+ } catch (Throwable t) {
+ logger.error("Adding a task to drop child views failed!", t);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bbe8e20/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
new file mode 100644
index 0000000..ca71961
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+
+import java.util.Properties;
+import java.util.TimerTask;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.ipc.RpcServer.Call;
+import org.apache.hadoop.hbase.ipc.RpcUtil;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.TaskType;
+
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+
+
+/**
+ * Coprocessor for task related operations. This coprocessor would only be registered
+ * to SYSTEM.TASK table
+ */
+
+public class TaskRegionObserver extends BaseRegionObserver {
+ public static final Log LOG = LogFactory.getLog(TaskRegionObserver.class);
+ protected ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(TaskType.values().length);
+ private long timeInterval = QueryServicesOptions.DEFAULT_TASK_HANDLING_INTERVAL_MS;
+ private long timeMaxInterval = QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS;
+ @GuardedBy("TaskRegionObserver.class")
+ // initial delay before the first task is handled
+ private static final long initialDelay = 10000; // 10 secs
+
+ @Override
+ public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c,
+ boolean abortRequested) {
+ executor.shutdownNow();
+ }
+
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ Configuration config = env.getConfiguration();
+ timeInterval =
+ config.getLong(
+ QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_TASK_HANDLING_INTERVAL_MS);
+ timeMaxInterval =
+ config.getLong(
+ QueryServices.TASK_HANDLING_MAX_INTERVAL_MS_ATTRIB,
+ QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+ }
+
+ @Override
+ public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
+ final RegionCoprocessorEnvironment env = e.getEnvironment();
+
+ // turn off verbose deprecation logging
+ Logger deprecationLogger = Logger.getLogger("org.apache.hadoop.conf.Configuration.deprecation");
+ if (deprecationLogger != null) {
+ deprecationLogger.setLevel(Level.WARN);
+ }
+
+ DropChildViewsTask task = new DropChildViewsTask(e.getEnvironment(), timeMaxInterval);
+ executor.scheduleWithFixedDelay(task, initialDelay, timeInterval, TimeUnit.MILLISECONDS);
+ }
+
+ private static void mutateSystemTaskTable(final PhoenixConnection conn, final PreparedStatement stmt, boolean accessCheckEnabled)
+ throws IOException {
+ // we need to mutate SYSTEM.TASK with HBase/login user if access is enabled.
+ if (accessCheckEnabled) {
+ User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ final Call rpcContext = RpcUtil.getRpcContext();
+ // setting RPC context as null so that user can be reset
+ try {
+ RpcUtil.setRpcContext(null);
+ stmt.execute();
+ conn.commit();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ } finally {
+ // setting RPC context back to original context of the RPC
+ RpcUtil.setRpcContext(rpcContext);
+ }
+ return null;
+ }
+ });
+ }
+ else {
+ try {
+ stmt.execute();
+ conn.commit();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ public static void addTask(PhoenixConnection conn, TaskType taskType, String tenantId, String schemaName,
+ String tableName, boolean accessCheckEnabled)
+ throws IOException {
+ PreparedStatement stmt = null;
+ try {
+ stmt = conn.prepareStatement("UPSERT INTO " +
+ PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + " ( " +
+ PhoenixDatabaseMetaData.TASK_TYPE + ", " +
+ PhoenixDatabaseMetaData.TENANT_ID + ", " +
+ PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
+ PhoenixDatabaseMetaData.TABLE_NAME + " ) VALUES(?,?,?,?)");
+ stmt.setByte(1, taskType.getSerializedValue());
+ if (tenantId != null) {
+ stmt.setString(2, tenantId);
+ } else {
+ stmt.setNull(2, Types.VARCHAR);
+ }
+ if (schemaName != null) {
+ stmt.setString(3, schemaName);
+ } else {
+ stmt.setNull(3, Types.VARCHAR);
+ }
+ stmt.setString(4, tableName);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ mutateSystemTaskTable(conn, stmt, accessCheckEnabled);
+ }
+
+ public static void deleteTask(PhoenixConnection conn, TaskType taskType, Timestamp ts, String tenantId,
+ String schemaName, String tableName, boolean accessCheckEnabled) throws IOException {
+ PreparedStatement stmt = null;
+ try {
+ stmt = conn.prepareStatement("DELETE FROM " +
+ PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
+ " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = ? AND " +
+ PhoenixDatabaseMetaData.TASK_TS + " = ? AND " +
+ PhoenixDatabaseMetaData.TENANT_ID + (tenantId == null ? " IS NULL " : " = '" + tenantId + "'") + " AND " +
+ PhoenixDatabaseMetaData.TABLE_SCHEM + (schemaName == null ? " IS NULL " : " = '" + schemaName + "'") + " AND " +
+ PhoenixDatabaseMetaData.TABLE_NAME + " = ?");
+ stmt.setByte(1, taskType.getSerializedValue());
+ stmt.setTimestamp(2, ts);
+ stmt.setString(3, tableName);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ mutateSystemTaskTable(conn, stmt, accessCheckEnabled);
+ }
+
+ /**
+ * Task runs periodically to clean up task of child views whose parent is dropped
+ *
+ */
+ public static class DropChildViewsTask extends TimerTask {
+ private RegionCoprocessorEnvironment env;
+ private long timeMaxInterval;
+ private boolean accessCheckEnabled;
+
+ public DropChildViewsTask(RegionCoprocessorEnvironment env, long timeMaxInterval) {
+ this.env = env;
+ this.accessCheckEnabled = env.getConfiguration().getBoolean(QueryServices.PHOENIX_ACLS_ENABLED,
+ QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED);
+ this.timeMaxInterval = timeMaxInterval;
+ }
+
+ @Override
+ public void run() {
+ PhoenixConnection connForTask = null;
+ Timestamp timestamp = null;
+ String tenantId = null;
+ byte[] tenantIdBytes;
+ String schemaName= null;
+ byte[] schemaNameBytes;
+ String tableName = null;
+ byte[] tableNameBytes;
+ PhoenixConnection pconn;
+ try {
+ String taskQuery = "SELECT " +
+ PhoenixDatabaseMetaData.TASK_TS + ", " +
+ PhoenixDatabaseMetaData.TENANT_ID + ", " +
+ PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
+ PhoenixDatabaseMetaData.TABLE_NAME +
+ " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
+ " WHERE "+ PhoenixDatabaseMetaData.TASK_TYPE + " = " + PTable.TaskType.DROP_CHILD_VIEWS.getSerializedValue();
+
+ connForTask = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
+ PreparedStatement taskStatement = connForTask.prepareStatement(taskQuery);
+ ResultSet rs = taskStatement.executeQuery();
+ while (rs.next()) {
+ try {
+ // delete child views only if the parent table is deleted from the system catalog
+ timestamp = rs.getTimestamp(1);
+ tenantId = rs.getString(2);
+ tenantIdBytes= rs.getBytes(2);
+ schemaName= rs.getString(3);
+ schemaNameBytes = rs.getBytes(3);
+ tableName= rs.getString(4);
+ tableNameBytes = rs.getBytes(4);
+
+ if (tenantId != null) {
+ Properties tenantProps = new Properties();
+ tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ pconn = QueryUtil.getConnectionOnServer(tenantProps, env.getConfiguration()).unwrap(PhoenixConnection.class);
+
+ }
+ else {
+ pconn = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
+ }
+
+ MetaDataProtocol.MetaDataMutationResult result = new MetaDataClient(pconn).updateCache(pconn.getTenantId(),
+ schemaName, tableName, true);
+ if (result.getMutationCode() != MetaDataProtocol.MutationCode.TABLE_ALREADY_EXISTS) {
+ MetaDataEndpointImpl.dropChildViews(env, tenantIdBytes, schemaNameBytes, tableNameBytes);
+ } else if (System.currentTimeMillis() < timeMaxInterval + timestamp.getTime()) {
+ // skip this task as it has not been expired and its parent table has not been dropped yet
+ LOG.info("Skipping a child view drop task. The parent table has not been dropped yet : " +
+ schemaName + "." + tableName +
+ " with tenant id " + (tenantId == null ? " IS NULL" : tenantId) +
+ " and timestamp " + timestamp.toString());
+ continue;
+ }
+ else {
+ LOG.warn(" A drop child view task has expired and will be removed from the system task table : " +
+ schemaName + "." + tableName +
+ " with tenant id " + (tenantId == null ? " IS NULL" : tenantId) +
+ " and timestamp " + timestamp.toString());
+ }
+
+ deleteTask(connForTask, PTable.TaskType.DROP_CHILD_VIEWS, timestamp, tenantId, schemaName,
+ tableName, this.accessCheckEnabled);
+ }
+ catch (Throwable t) {
+ LOG.warn("Exception while dropping a child view task. " +
+ "It will be retried in the next system task table scan : " +
+ schemaName + "." + tableName +
+ " with tenant id " + (tenantId == null ? " IS NULL" : tenantId) +
+ " and timestamp " + timestamp.toString(), t);
+ }
+ }
+ } catch (Throwable t) {
+ LOG.error("DropChildViewsTask failed!", t);
+ } finally {
+ if (connForTask != null) {
+ try {
+ connForTask.close();
+ } catch (SQLException ignored) {
+ LOG.debug("DropChildViewsTask can't close connection", ignored);
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bbe8e20/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 52dfe99..3ff62e2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -213,6 +213,10 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
public static final byte[] INDEX_TYPE_BYTES = Bytes.toBytes(INDEX_TYPE);
public static final String LINK_TYPE = "LINK_TYPE";
public static final byte[] LINK_TYPE_BYTES = Bytes.toBytes(LINK_TYPE);
+ public static final String TASK_TYPE = "TASK_TYPE";
+ public static final byte[] TASK_TYPE_BYTES = Bytes.toBytes(TASK_TYPE);
+ public static final String TASK_TS = "TASK_TS";
+ public static final byte[] TASK_TS_BYTES = Bytes.toBytes(TASK_TS);
public static final String ARRAY_SIZE = "ARRAY_SIZE";
public static final byte[] ARRAY_SIZE_BYTES = Bytes.toBytes(ARRAY_SIZE);
public static final String VIEW_CONSTANT = "VIEW_CONSTANT";
@@ -361,7 +365,10 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
public static final byte[] SYSTEM_CHILD_LINK_NAME_BYTES = Bytes.toBytes(SYSTEM_CHILD_LINK_NAME);
public static final TableName SYSTEM_LINK_HBASE_TABLE_NAME = TableName.valueOf(SYSTEM_CHILD_LINK_NAME);
-
+ public static final String SYSTEM_TASK_TABLE = "TASK";
+ public static final String SYSTEM_TASK_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_TASK_TABLE);
+ public static final byte[] SYSTEM_TASK_NAME_BYTES = Bytes.toBytes(SYSTEM_TASK_NAME);
+ public static final TableName SYSTEM_TASK_HBASE_TABLE_NAME = TableName.valueOf(SYSTEM_TASK_NAME);
//SYSTEM:LOG
public static final String SYSTEM_LOG_TABLE = "LOG";
public static final String QUERY_ID = "QUERY_ID";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bbe8e20/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 37cdd21..e750d22 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -152,6 +152,7 @@ import org.apache.phoenix.coprocessor.MetaDataRegionObserver;
import org.apache.phoenix.coprocessor.ScanRegionObserver;
import org.apache.phoenix.coprocessor.SequenceRegionObserver;
import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest;
@@ -958,7 +959,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (!descriptor.hasCoprocessor(SequenceRegionObserver.class.getName())) {
descriptor.addCoprocessor(SequenceRegionObserver.class.getName(), null, priority, null);
}
+ } else if (SchemaUtil.isTaskTable(tableName)) {
+ if(!descriptor.hasCoprocessor(TaskRegionObserver.class.getName())) {
+ descriptor.addCoprocessor(TaskRegionObserver.class.getName(), null, priority, null);
}
+ }
if (isTransactional) {
Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor();
@@ -2812,6 +2817,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return setSystemDDLProperties(QueryConstants.CREATE_MUTEX_METADTA);
}
+ protected String getTaskDDL() {
+ return setSystemDDLProperties(QueryConstants.CREATE_TASK_METADATA);
+ }
+
private String setSystemDDLProperties(String ddl) {
return String.format(ddl,
props.getInt(DEFAULT_SYSTEM_MAX_VERSIONS_ATTRIB, QueryServicesOptions.DEFAULT_SYSTEM_MAX_VERSIONS),
@@ -3038,6 +3047,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
try {
metaConnection.createStatement().executeUpdate(getMutexDDL());
} catch (TableAlreadyExistsException e) {}
+ try {
+ metaConnection.createStatement().executeUpdate(getTaskDDL());
+ } catch (TableAlreadyExistsException e) {}
+
// Catch the IOException to log the error message and then bubble it up for the client to retry.
}
@@ -3496,6 +3509,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
try {
metaConnection.createStatement().executeUpdate(getMutexDDL());
} catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {}
+ try {
+ metaConnection.createStatement().executeUpdate(getTaskDDL());
+ } catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {}
// In case namespace mapping is enabled and system table to system namespace mapping is also enabled,
// create an entry for the SYSTEM namespace in the SYSCAT table, so that GRANT/REVOKE commands can work
@@ -3733,8 +3749,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// No tables exist matching "SYSTEM\..*", they are all already in "SYSTEM:.*"
if (tableNames.size() == 0) { return; }
// Try to move any remaining tables matching "SYSTEM\..*" into "SYSTEM:"
- if (tableNames.size() > 7) {
- logger.warn("Expected 7 system tables but found " + tableNames.size() + ":" + tableNames);
+ if (tableNames.size() > 8) {
+ logger.warn("Expected 8 system tables but found " + tableNames.size() + ":" + tableNames);
}
byte[] mappedSystemTable = SchemaUtil
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bbe8e20/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index bfe54ff..acd78ad 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -185,6 +185,10 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
return setSystemDDLProperties(QueryConstants.CREATE_MUTEX_METADTA);
}
+ protected String getTaskDDL() {
+ return setSystemDDLProperties(QueryConstants.CREATE_TASK_METADATA);
+ }
+
private String setSystemDDLProperties(String ddl) {
return String.format(ddl,
props.getInt(DEFAULT_SYSTEM_MAX_VERSIONS_ATTRIB, QueryServicesOptions.DEFAULT_SYSTEM_MAX_VERSIONS),
@@ -388,6 +392,11 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
.executeUpdate(getMutexDDL());
} catch (NewerTableAlreadyExistsException ignore) {
}
+ try {
+ metaConnection.createStatement()
+ .executeUpdate(getTaskDDL());
+ } catch (NewerTableAlreadyExistsException ignore) {
+ }
} catch (SQLException e) {
sqlE = e;
} finally {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bbe8e20/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index bbff343..c45fec9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -347,8 +347,8 @@ public interface QueryConstants {
// Links from parent to child views are stored in a separate table for
// scalability
- public static final String CREATE_CHILD_LINK_METADATA = "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\""
- + SYSTEM_CHILD_LINK_TABLE + "\"(\n" +
+ public static final String CREATE_CHILD_LINK_METADATA = "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" +
+ SYSTEM_CHILD_LINK_TABLE + "\"(\n" +
// PK columns
TENANT_ID + " VARCHAR NULL," + TABLE_SCHEM + " VARCHAR NULL," + TABLE_NAME + " VARCHAR NOT NULL,"
+ COLUMN_NAME + " VARCHAR NULL," + COLUMN_FAMILY + " VARCHAR NULL," + LINK_TYPE + " UNSIGNED_TINYINT,\n"
@@ -371,4 +371,17 @@ public interface QueryConstants {
HColumnDescriptor.KEEP_DELETED_CELLS + "=%s,\n" +
PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
+ public static final String CREATE_TASK_METADATA =
+ "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_TASK_TABLE + "\"(\n" +
+ // PK columns
+ TASK_TYPE + " UNSIGNED_TINYINT NOT NULL," +
+ TASK_TS + " TIMESTAMP NOT NULL," +
+ TENANT_ID + " VARCHAR NULL," +
+ TABLE_SCHEM + " VARCHAR NULL," +
+ TABLE_NAME + " VARCHAR NOT NULL,\n" +
+ "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TASK_TYPE + "," + TASK_TS + " ROW_TIMESTAMP," + TENANT_ID + "," + TABLE_SCHEM + "," +
+ TABLE_NAME + "))\n" +
+ HConstants.VERSIONS + "=%s,\n" +
+ HColumnDescriptor.KEEP_DELETED_CELLS + "=%s,\n" +
+ PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bbe8e20/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 337bb05..d06f07c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -320,6 +320,12 @@ public interface QueryServices extends SQLCloseable {
public static final String SYSTEM_CATALOG_SPLITTABLE = "phoenix.system.catalog.splittable";
+ // The parameters defined for handling task stored in table SYSTEM.TASK
+ // The time interval between periodic scans of table SYSTEM.TASK
+ public static final String TASK_HANDLING_INTERVAL_MS_ATTRIB = "phoenix.task.handling.interval.ms";
+ // The maximum time for a task to stay in table SYSTEM.TASK
+ public static final String TASK_HANDLING_MAX_INTERVAL_MS_ATTRIB = "phoenix.task.handling.maxInterval.ms";
+
/**
* Get executor service used for parallel scans
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bbe8e20/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 35dbe3a..a6a73f7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -340,6 +340,10 @@ public class QueryServicesOptions {
public static final int DEFAULT_UPDATE_CACHE_FREQUENCY = 0;
public static final int DEFAULT_SMALL_SCAN_THRESHOLD = 100;
+ // default system task handling interval in milliseconds
+ public static final long DEFAULT_TASK_HANDLING_INTERVAL_MS = 60*1000; // 1 min
+ public static final long DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS = 30*60*1000; // 30 minutes
+
@SuppressWarnings("serial")
public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() {
{
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bbe8e20/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 8cbf757..6dfe411 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -192,7 +192,36 @@ public interface PTable extends PMetaDataEntity {
return LinkType.values()[serializedValue-1];
}
}
-
+
+ public enum TaskType {
+ DROP_CHILD_VIEWS((byte)1);
+
+ private final byte[] byteValue;
+ private final byte serializedValue;
+
+ TaskType(byte serializedValue) {
+ this.serializedValue = serializedValue;
+ this.byteValue = Bytes.toBytes(this.name());
+ }
+
+ public byte[] getBytes() {
+ return byteValue;
+ }
+
+ public byte getSerializedValue() {
+ return this.serializedValue;
+ }
+ public static TaskType getDefault() {
+ return DROP_CHILD_VIEWS;
+ }
+ public static TaskType fromSerializedValue(byte serializedValue) {
+ if (serializedValue < 1 || serializedValue > TaskType.values().length) {
+ throw new IllegalArgumentException("Invalid TaskType " + serializedValue);
+ }
+ return TaskType.values()[serializedValue-1];
+ }
+ }
+
public enum ImmutableStorageScheme implements ColumnValueEncoderDecoderSupplier {
ONE_CELL_PER_COLUMN((byte)1) {
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bbe8e20/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
index 4a758b7..23b1fcc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
@@ -62,10 +62,12 @@ public class StatisticsUtil {
DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME));
DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME));
DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME));
+ DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME));
DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,true));
DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES,true));
DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES,true));
DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES,true));
+ DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME_BYTES,true));
}
private StatisticsUtil() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bbe8e20/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index ec2eb14..24a0e12 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -540,6 +540,12 @@ public class SchemaUtil {
|| Bytes.compareTo(tableName, SchemaUtil
.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES, true).getName()) == 0;
}
+
+ public static boolean isTaskTable(byte[] tableName) {
+ return Bytes.compareTo(tableName, PhoenixDatabaseMetaData.SYSTEM_TASK_NAME_BYTES) == 0
+ || Bytes.compareTo(tableName, SchemaUtil
+ .getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME_BYTES, true).getName()) == 0;
+ }
public static boolean isChildLinkTable(byte[] tableName) {
return Bytes.compareTo(tableName, SYSTEM_CHILD_LINK_NAME_BYTES) == 0 || Bytes.compareTo(tableName,
@@ -550,6 +556,10 @@ public class SchemaUtil {
return PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME.equals(table.getName().getString());
}
+ public static boolean isTaskTable(PTable table) {
+ return PhoenixDatabaseMetaData.SYSTEM_TASK_NAME.equals(table.getName().getString());
+ }
+
public static boolean isMetaTable(PTable table) {
return PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA.equals(table.getSchemaName().getString()) && PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE.equals(table.getTableName().getString());
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bbe8e20/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 5ca247b..e6e3936 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -620,6 +620,7 @@ public abstract class BaseTest {
conf.setInt("hbase.assignment.zkevent.workers", 5);
conf.setInt("hbase.assignment.threads.max", 5);
conf.setInt("hbase.catalogjanitor.interval", 5000);
+ conf.setInt(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, 1000);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
conf.setInt(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, 1);
return conf;