You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/12/16 18:49:10 UTC
[3/6] phoenix git commit: Sync 4.x-HBase-1.2 to master (Pedro Boado)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointObserver.java
new file mode 100644
index 0000000..86b8bf1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointObserver.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.phoenix.coprocessor.PhoenixMetaDataCoprocessorHost.PhoenixMetaDataControllerEnvironment;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+
+public interface MetaDataEndpointObserver extends Coprocessor {
+
+ void preGetTable( ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId,String tableName,
+ TableName physicalTableName) throws IOException;
+
+ void preCreateTable(final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, final String tenantId,
+ String tableName, TableName physicalTableName, final TableName parentPhysicalTableName,
+ PTableType tableType, final Set<byte[]> familySet, Set<TableName> indexes) throws IOException;
+
+ void preDropTable(final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, final String tenantId,
+ final String tableName,TableName physicalTableName, TableName parentPhysicalTableName, PTableType tableType, List<PTable> indexes) throws IOException;
+
+ void preAlterTable(final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, final String tenantId,final String tableName,
+ final TableName physicalTableName,final TableName parentPhysicalTableName, PTableType type) throws IOException;
+
+ void preGetSchema(final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, final String schemaName)
+ throws IOException;
+
+ void preCreateSchema(final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, final String schemaName)
+ throws IOException;
+
+ void preDropSchema(final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, final String schemaName)
+ throws IOException;
+
+ void preCreateFunction(final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, final String tenantId,
+ final String functionName) throws IOException;
+
+ void preDropFunction(final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, final String tenantId,
+ final String functionName) throws IOException;
+
+ void preGetFunctions(final ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, final String tenantId,
+ final String functionName) throws IOException;
+
+ void preIndexUpdate(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId,
+ String indexName, TableName physicalTableName, TableName parentPhysicalTableName, PIndexState newState) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index c816549..af06235 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.coprocessor;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
@@ -50,6 +51,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -163,9 +165,18 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, props));
statsTable = env.getTable(
SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES, props));
- if (UpgradeUtil.truncateStats(metaTable, statsTable)) {
- LOG.info("Stats are successfully truncated for upgrade 4.7!!");
- }
+ final HTableInterface mTable=metaTable;
+ final HTableInterface sTable=statsTable;
+ User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ if (UpgradeUtil.truncateStats(mTable, sTable)) {
+ LOG.info("Stats are successfully truncated for upgrade 4.7!!");
+ }
+ return null;
+ }
+ });
+
} catch (Exception exception) {
LOG.warn("Exception while truncate stats..,"
+ " please check and delete stats manually inorder to get proper result with old client!!");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java
new file mode 100644
index 0000000..a4bc857
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.AuthUtil;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
+import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessControlClient;
+import org.apache.hadoop.hbase.security.access.AuthResult;
+import org.apache.hadoop.hbase.security.access.Permission;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
+import org.apache.hadoop.hbase.security.access.UserPermission;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.PhoenixMetaDataCoprocessorHost.PhoenixMetaDataControllerEnvironment;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.MetaDataUtil;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.RpcCallback;
+
+public class PhoenixAccessController extends BaseMetaDataEndpointObserver {
+
+ private PhoenixMetaDataControllerEnvironment env;
+ private ArrayList<BaseMasterAndRegionObserver> accessControllers;
+ private boolean accessCheckEnabled;
+ private UserProvider userProvider;
+ public static final Log LOG = LogFactory.getLog(PhoenixAccessController.class);
+ private static final Log AUDITLOG =
+ LogFactory.getLog("SecurityLogger."+PhoenixAccessController.class.getName());
+
+ private List<BaseMasterAndRegionObserver> getAccessControllers() throws IOException {
+ if (accessControllers == null) {
+ synchronized (this) {
+ if (accessControllers == null) {
+ accessControllers = new ArrayList<BaseMasterAndRegionObserver>();
+ RegionCoprocessorHost cpHost = this.env.getCoprocessorHost();
+ List<BaseMasterAndRegionObserver> coprocessors = cpHost
+ .findCoprocessors(BaseMasterAndRegionObserver.class);
+ for (BaseMasterAndRegionObserver cp : coprocessors) {
+ if (cp instanceof AccessControlService.Interface) {
+ accessControllers.add(cp);
+ }
+ }
+ }
+ }
+ }
+ return accessControllers;
+ }
+
+ @Override
+ public void preGetTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId,
+ String tableName, TableName physicalTableName) throws IOException {
+ for (BaseMasterAndRegionObserver observer : getAccessControllers()) {
+ observer.preGetTableDescriptors(new ObserverContext<MasterCoprocessorEnvironment>(),
+ Lists.newArrayList(physicalTableName), Collections.<HTableDescriptor> emptyList());
+ }
+ }
+
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ Configuration conf = env.getConfiguration();
+ this.accessCheckEnabled = conf.getBoolean(QueryServices.PHOENIX_ACLS_ENABLED,
+ QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED);
+ if (!this.accessCheckEnabled) {
+ LOG.warn("PhoenixAccessController has been loaded with authorization checks disabled.");
+ }
+ if (env instanceof PhoenixMetaDataControllerEnvironment) {
+ this.env = (PhoenixMetaDataControllerEnvironment)env;
+ } else {
+ throw new IllegalArgumentException(
+ "Not a valid environment, should be loaded by PhoenixMetaDataControllerEnvironment");
+ }
+ // set the user-provider.
+ this.userProvider = UserProvider.instantiate(env.getConfiguration());
+ // init superusers and add the server principal (if using security)
+ // or process owner as default super user.
+ Superusers.initialize(env.getConfiguration());
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment env) throws IOException {}
+
+ @Override
+ public void preCreateTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId,
+ String tableName, TableName physicalTableName, TableName parentPhysicalTableName, PTableType tableType,
+ Set<byte[]> familySet, Set<TableName> indexes) throws IOException {
+ if (!accessCheckEnabled) { return; }
+
+ if (tableType != PTableType.VIEW) {
+ final HTableDescriptor htd = new HTableDescriptor(physicalTableName);
+ for (byte[] familyName : familySet) {
+ htd.addFamily(new HColumnDescriptor(familyName));
+ }
+ for (BaseMasterAndRegionObserver observer : getAccessControllers()) {
+ observer.preCreateTable(new ObserverContext<MasterCoprocessorEnvironment>(), htd, null);
+ }
+ }
+
+ // Index and view require read access on parent physical table.
+ Set<TableName> physicalTablesChecked = new HashSet<TableName>();
+ if (tableType == PTableType.VIEW || tableType == PTableType.INDEX) {
+ physicalTablesChecked.add(parentPhysicalTableName);
+ requireAccess("Create" + tableType, parentPhysicalTableName, Action.READ, Action.EXEC);
+ }
+
+ if (tableType == PTableType.VIEW) {
+
+ Action[] requiredActions = { Action.READ, Action.EXEC };
+ for (TableName index : indexes) {
+ if (!physicalTablesChecked.add(index)) {
+ // skip check for local index as we have already check the ACLs above
+ // And for same physical table multiple times like view index table
+ continue;
+ }
+
+ User user = getActiveUser();
+ List<UserPermission> permissionForUser = getPermissionForUser(
+ getUserPermissions(index.getNameAsString()), Bytes.toBytes(user.getShortName()));
+ Set<Action> requireAccess = new HashSet<>();
+ Set<Action> accessExists = new HashSet<>();
+ if (permissionForUser != null) {
+ for (UserPermission userPermission : permissionForUser) {
+ for (Action action : Arrays.asList(requiredActions)) {
+ if (!userPermission.implies(action)) {
+ requireAccess.add(action);
+ }
+ }
+ }
+ if (!requireAccess.isEmpty()) {
+ for (UserPermission userPermission : permissionForUser) {
+ accessExists.addAll(Arrays.asList(userPermission.getActions()));
+ }
+
+ }
+ } else {
+ requireAccess.addAll(Arrays.asList(requiredActions));
+ }
+ if (!requireAccess.isEmpty()) {
+ byte[] indexPhysicalTable = index.getName();
+ handleRequireAccessOnDependentTable("Create" + tableType, user.getName(),
+ TableName.valueOf(indexPhysicalTable), tableName, requireAccess, accessExists);
+ }
+ }
+
+ }
+
+ if (tableType == PTableType.INDEX) {
+ // All the users who have READ access on data table should have access to Index table as well.
+ // WRITE is needed for the index updates done by the user who has WRITE access on data table.
+ // CREATE is needed during the drop of the table.
+ // We are doing this because existing user while querying data table should not see access denied for the
+ // new indexes.
+ // TODO: confirm whether granting permission from coprocessor is a security leak.(currently it is done if
+ // automatic grant is enabled explicitly by user in configuration
+ // skip check for local index
+ if (physicalTableName != null && !parentPhysicalTableName.equals(physicalTableName)
+ && !MetaDataUtil.isViewIndex(physicalTableName.getNameAsString())) {
+ authorizeOrGrantAccessToUsers("Create" + tableType, parentPhysicalTableName,
+ Arrays.asList(Action.READ, Action.WRITE, Action.CREATE, Action.EXEC, Action.ADMIN),
+ physicalTableName);
+ }
+ }
+ }
+
+
+ public void handleRequireAccessOnDependentTable(String request, String userName, TableName dependentTable,
+ String requestTable, Set<Action> requireAccess, Set<Action> accessExists) throws IOException {
+
+ Set<Action> unionSet = new HashSet<Action>();
+ unionSet.addAll(requireAccess);
+ unionSet.addAll(accessExists);
+ AUDITLOG.info(request + ": Automatically granting access to index table during creation of view:"
+ + requestTable + authString(userName, dependentTable, requireAccess));
+ grantPermissions(userName, dependentTable.getName(), unionSet.toArray(new Action[0]));
+ }
+
+ private void grantPermissions(final String toUser, final byte[] table, final Action... actions) throws IOException {
+ User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ try (Connection conn = ConnectionFactory.createConnection(env.getConfiguration())) {
+ AccessControlClient.grant(conn, TableName.valueOf(table), toUser , null, null,
+ actions);
+ } catch (Throwable e) {
+ new DoNotRetryIOException(e);
+ }
+ return null;
+ }
+ });
+ }
+
+ private void authorizeOrGrantAccessToUsers(final String request, final TableName fromTable,
+ final List<Action> requiredActionsOnTable, final TableName toTable)
+ throws IOException {
+ User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws IOException {
+ try (Connection conn = ConnectionFactory.createConnection(env.getConfiguration())) {
+ List<UserPermission> userPermissions = getUserPermissions(fromTable.getNameAsString());
+ List<UserPermission> permissionsOnTheTable = getUserPermissions(toTable.getNameAsString());
+ if (userPermissions != null) {
+ for (UserPermission userPermission : userPermissions) {
+ Set<Action> requireAccess = new HashSet<Action>();
+ Set<Action> accessExists = new HashSet<Action>();
+ List<UserPermission> permsToTable = getPermissionForUser(permissionsOnTheTable,
+ userPermission.getUser());
+ for (Action action : requiredActionsOnTable) {
+ boolean haveAccess=false;
+ if (userPermission.implies(action)) {
+ if (permsToTable == null) {
+ requireAccess.add(action);
+ } else {
+ for (UserPermission permToTable : permsToTable) {
+ if (permToTable.implies(action)) {
+ haveAccess=true;
+ }
+ }
+ if (!haveAccess) {
+ requireAccess.add(action);
+ }
+ }
+ }
+ }
+ if (permsToTable != null) {
+ // Append access to already existing access for the user
+ for (UserPermission permToTable : permsToTable) {
+ accessExists.addAll(Arrays.asList(permToTable.getActions()));
+ }
+ }
+ if (!requireAccess.isEmpty()) {
+ if(AuthUtil.isGroupPrincipal(Bytes.toString(userPermission.getUser()))){
+ AUDITLOG.warn("Users of GROUP:" + Bytes.toString(userPermission.getUser())
+ + " will not have following access " + requireAccess
+ + " to the newly created index " + toTable
+ + ", Automatic grant is not yet allowed on Groups");
+ continue;
+ }
+ handleRequireAccessOnDependentTable(request, Bytes.toString(userPermission.getUser()),
+ toTable, toTable.getNameAsString(), requireAccess, accessExists);
+ }
+ }
+ }
+ }
+ return null;
+ }
+ });
+ }
+
+ private List<UserPermission> getPermissionForUser(List<UserPermission> perms, byte[] user) {
+ if (perms != null) {
+ // get list of permissions for the user as multiple implementation of AccessControl coprocessors can give
+ // permissions for same users
+ List<UserPermission> permissions = new ArrayList<>();
+ for (UserPermission p : perms) {
+ if (Bytes.equals(p.getUser(),user)){
+ permissions.add(p);
+ }
+ }
+ if (!permissions.isEmpty()){
+ return permissions;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void preDropTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId,
+ String tableName, TableName physicalTableName, TableName parentPhysicalTableName, PTableType tableType,
+ List<PTable> indexes) throws IOException {
+ if (!accessCheckEnabled) { return; }
+
+ for (BaseMasterAndRegionObserver observer : getAccessControllers()) {
+ if (tableType != PTableType.VIEW) {
+ observer.preDeleteTable(new ObserverContext<MasterCoprocessorEnvironment>(), physicalTableName);
+ }
+ if (indexes != null) {
+ for (PTable index : indexes) {
+ observer.preDeleteTable(new ObserverContext<MasterCoprocessorEnvironment>(),
+ TableName.valueOf(index.getPhysicalName().getBytes()));
+ }
+ }
+ }
+ //checking similar permission checked during the create of the view.
+ if (tableType == PTableType.VIEW || tableType == PTableType.INDEX) {
+ requireAccess("Drop "+tableType, parentPhysicalTableName, Action.READ, Action.EXEC);
+ }
+ }
+
+ @Override
+ public void preAlterTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId,
+ String tableName, TableName physicalTableName, TableName parentPhysicalTableName, PTableType tableType) throws IOException {
+ if (!accessCheckEnabled) { return; }
+ for (BaseMasterAndRegionObserver observer : getAccessControllers()) {
+ if (tableType != PTableType.VIEW) {
+ observer.preModifyTable(new ObserverContext<MasterCoprocessorEnvironment>(), physicalTableName,
+ new HTableDescriptor(physicalTableName));
+ }
+ }
+ if (tableType == PTableType.VIEW) {
+ requireAccess("Alter "+tableType, parentPhysicalTableName, Action.READ, Action.EXEC);
+ }
+ }
+
+ @Override
+ public void preGetSchema(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String schemaName)
+ throws IOException {
+ if (!accessCheckEnabled) { return; }
+ for (BaseMasterAndRegionObserver observer : getAccessControllers()) {
+ observer.preListNamespaceDescriptors(new ObserverContext<MasterCoprocessorEnvironment>(),
+ Arrays.asList(NamespaceDescriptor.create(schemaName).build()));
+ }
+ }
+
+ @Override
+ public void preCreateSchema(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String schemaName)
+ throws IOException {
+ if (!accessCheckEnabled) { return; }
+ for (BaseMasterAndRegionObserver observer : getAccessControllers()) {
+ observer.preCreateNamespace(new ObserverContext<MasterCoprocessorEnvironment>(),
+ NamespaceDescriptor.create(schemaName).build());
+ }
+ }
+
+ @Override
+ public void preDropSchema(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String schemaName)
+ throws IOException {
+ if (!accessCheckEnabled) { return; }
+ for (BaseMasterAndRegionObserver observer : getAccessControllers()) {
+ observer.preDeleteNamespace(new ObserverContext<MasterCoprocessorEnvironment>(), schemaName);
+ }
+ }
+
+ @Override
+ public void preIndexUpdate(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId,
+ String indexName, TableName physicalTableName, TableName parentPhysicalTableName, PIndexState newState)
+ throws IOException {
+ if (!accessCheckEnabled) { return; }
+ for (BaseMasterAndRegionObserver observer : getAccessControllers()) {
+ observer.preModifyTable(new ObserverContext<MasterCoprocessorEnvironment>(), physicalTableName,
+ new HTableDescriptor(physicalTableName));
+ }
+ // Check for read access in case of rebuild
+ if (newState == PIndexState.BUILDING) {
+ requireAccess("Rebuild:", parentPhysicalTableName, Action.READ, Action.EXEC);
+ }
+ }
+
+ private List<UserPermission> getUserPermissions(final String tableName) throws IOException {
+ return User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
+ @Override
+ public List<UserPermission> run() throws Exception {
+ final List<UserPermission> userPermissions = new ArrayList<UserPermission>();
+ try (Connection connection = ConnectionFactory.createConnection(env.getConfiguration())) {
+ for (BaseMasterAndRegionObserver service : accessControllers) {
+ if (service.getClass().getName().equals(org.apache.hadoop.hbase.security.access.AccessController.class.getName())) {
+ userPermissions.addAll(AccessControlClient.getUserPermissions(connection, tableName));
+ } else {
+ AccessControlProtos.GetUserPermissionsRequest.Builder builder = AccessControlProtos.GetUserPermissionsRequest
+ .newBuilder();
+ builder.setTableName(ProtobufUtil.toProtoTableName(TableName.valueOf(tableName)));
+ builder.setType(AccessControlProtos.Permission.Type.Table);
+ AccessControlProtos.GetUserPermissionsRequest request = builder.build();
+
+ PayloadCarryingRpcController controller = ((ClusterConnection)connection)
+ .getRpcControllerFactory().newController();
+ ((AccessControlService.Interface)service).getUserPermissions(controller, request,
+ new RpcCallback<AccessControlProtos.GetUserPermissionsResponse>() {
+ @Override
+ public void run(AccessControlProtos.GetUserPermissionsResponse message) {
+ if (message != null) {
+ for (AccessControlProtos.UserPermission perm : message
+ .getUserPermissionList()) {
+ userPermissions.add(ProtobufUtil.toUserPermission(perm));
+ }
+ }
+ }
+ });
+ }
+ }
+ } catch (Throwable e) {
+ if (e instanceof Exception) {
+ throw (Exception) e;
+ } else if (e instanceof Error) {
+ throw (Error) e;
+ }
+ throw new Exception(e);
+ }
+ return userPermissions;
+ }
+ });
+ }
+
+ /**
+ * Authorizes that the current user has all the given permissions for the
+ * given table
+ * @param tableName Table requested
+ * @throws IOException if obtaining the current user fails
+ * @throws AccessDeniedException if user has no authorization
+ */
+ private void requireAccess(String request, TableName tableName, Action... permissions) throws IOException {
+ User user = getActiveUser();
+ AuthResult result = null;
+ List<Action> requiredAccess = new ArrayList<Action>();
+ for (Action permission : permissions) {
+ if (hasAccess(getUserPermissions(tableName.getNameAsString()), tableName, permission, user)) {
+ result = AuthResult.allow(request, "Table permission granted", user, permission, tableName, null, null);
+ } else {
+ result = AuthResult.deny(request, "Insufficient permissions", user, permission, tableName, null, null);
+ requiredAccess.add(permission);
+ }
+ logResult(result);
+ }
+ if (!requiredAccess.isEmpty()) {
+ result = AuthResult.deny(request, "Insufficient permissions", user, requiredAccess.get(0), tableName, null,
+ null);
+ }
+ if (!result.isAllowed()) { throw new AccessDeniedException("Insufficient permissions "
+ + authString(user.getName(), tableName, new HashSet<Permission.Action>(Arrays.asList(permissions)))); }
+ }
+
+ /**
+ * Checks if the user has access to the table for the specified action.
+ *
+ * @param perms All table permissions
+ * @param table tablename
+ * @param action action for access is required
+ * @return true if the user has access to the table for specified action, false otherwise
+ */
+ private boolean hasAccess(List<UserPermission> perms, TableName table, Permission.Action action, User user) {
+ if (Superusers.isSuperUser(user)){
+ return true;
+ }
+ if (perms != null) {
+ List<UserPermission> permissionsForUser = getPermissionForUser(perms, user.getShortName().getBytes());
+ if (permissionsForUser != null) {
+ for (UserPermission permissionForUser : permissionsForUser) {
+ if (permissionForUser.implies(action)) { return true; }
+ }
+ }
+ String[] groupNames = user.getGroupNames();
+ if (groupNames != null) {
+ for (String group : groupNames) {
+ List<UserPermission> groupPerms = getPermissionForUser(perms,(AuthUtil.toGroupEntry(group)).getBytes());
+ if (groupPerms != null) for (UserPermission permissionForUser : groupPerms) {
+ if (permissionForUser.implies(action)) { return true; }
+ }
+ }
+ }
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("No permissions found for table=" + table);
+ }
+ return false;
+ }
+
+ private User getActiveUser() throws IOException {
+ User user = RpcServer.getRequestUser();
+ if (user == null) {
+ // for non-rpc handling, fallback to system user
+ user = userProvider.getCurrent();
+ }
+ return user;
+ }
+
+ private void logResult(AuthResult result) {
+ if (AUDITLOG.isTraceEnabled()) {
+ InetAddress remoteAddr = RpcServer.getRemoteAddress();
+ AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") + " for user "
+ + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") + "; reason: "
+ + result.getReason() + "; remote address: " + (remoteAddr != null ? remoteAddr : "") + "; request: "
+ + result.getRequest() + "; context: " + result.toContextString());
+ }
+ }
+
+ private static final class Superusers {
+ private static final Log LOG = LogFactory.getLog(Superusers.class);
+
+ /** Configuration key for superusers */
+ public static final String SUPERUSER_CONF_KEY = org.apache.hadoop.hbase.security.Superusers.SUPERUSER_CONF_KEY; // Not getting a name
+
+ private static List<String> superUsers;
+ private static List<String> superGroups;
+ private static User systemUser;
+
+ private Superusers(){}
+
+ /**
+ * Should be called only once to pre-load list of super users and super
+ * groups from Configuration. This operation is idempotent.
+ * @param conf configuration to load users from
+ * @throws IOException if unable to initialize lists of superusers or super groups
+ * @throws IllegalStateException if current user is null
+ */
+ public static void initialize(Configuration conf) throws IOException {
+ superUsers = new ArrayList<>();
+ superGroups = new ArrayList<>();
+ systemUser = User.getCurrent();
+
+ if (systemUser == null) {
+ throw new IllegalStateException("Unable to obtain the current user, "
+ + "authorization checks for internal operations will not work correctly!");
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Current user name is " + systemUser.getShortName());
+ }
+ String currentUser = systemUser.getShortName();
+ String[] superUserList = conf.getStrings(SUPERUSER_CONF_KEY, new String[0]);
+ for (String name : superUserList) {
+ if (AuthUtil.isGroupPrincipal(name)) {
+ superGroups.add(AuthUtil.getGroupName(name));
+ } else {
+ superUsers.add(name);
+ }
+ }
+ superUsers.add(currentUser);
+ }
+
+ /**
+ * @return true if current user is a super user (whether as user running process,
+ * declared as individual superuser or member of supergroup), false otherwise.
+ * @param user to check
+ * @throws IllegalStateException if lists of superusers/super groups
+ * haven't been initialized properly
+ */
+ public static boolean isSuperUser(User user) {
+ if (superUsers == null) {
+ throw new IllegalStateException("Super users/super groups lists"
+ + " haven't been initialized properly.");
+ }
+ if (superUsers.contains(user.getShortName())) {
+ return true;
+ }
+
+ for (String group : user.getGroupNames()) {
+ if (superGroups.contains(group)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static List<String> getSuperUsers() {
+ return superUsers;
+ }
+
+ public static User getSystemUser() {
+ return systemUser;
+ }
+ }
+
+ public String authString(String user, TableName table, Set<Action> actions) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(" (user=").append(user != null ? user : "UNKNOWN").append(", ");
+ sb.append("scope=").append(table == null ? "GLOBAL" : table.getNameWithNamespaceInclAsString()).append(", ");
+ sb.append(actions.size() > 1 ? "actions=" : "action=").append(actions != null ? actions.toString() : "")
+ .append(")");
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java
new file mode 100644
index 0000000..15b0020
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+
+public class PhoenixMetaDataCoprocessorHost
+ extends CoprocessorHost<PhoenixMetaDataCoprocessorHost.PhoenixMetaDataControllerEnvironment> {
+ private RegionCoprocessorEnvironment env;
+ public static final String PHOENIX_META_DATA_COPROCESSOR_CONF_KEY =
+ "hbase.coprocessor.phoenix.classes";
+ public static final String DEFAULT_PHOENIX_META_DATA_COPROCESSOR_CONF_KEY="org.apache.phoenix.coprocessor.PhoenixAccessController";
+
+ public PhoenixMetaDataCoprocessorHost(RegionCoprocessorEnvironment env) {
+ super(null);
+ this.env = env;
+ this.conf = env.getConfiguration();
+ boolean accessCheckEnabled = this.conf.getBoolean(QueryServices.PHOENIX_ACLS_ENABLED,
+ QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED);
+ if (this.conf.get(PHOENIX_META_DATA_COPROCESSOR_CONF_KEY) == null && accessCheckEnabled) {
+ this.conf.set(PHOENIX_META_DATA_COPROCESSOR_CONF_KEY, DEFAULT_PHOENIX_META_DATA_COPROCESSOR_CONF_KEY);
+ }
+ loadSystemCoprocessors(conf, PHOENIX_META_DATA_COPROCESSOR_CONF_KEY);
+ }
+
+ private static abstract class CoprocessorOperation<T extends CoprocessorEnvironment> extends ObserverContext<T> {
+ abstract void call(MetaDataEndpointObserver oserver, ObserverContext<T> ctx) throws IOException;
+
+ public void postEnvCall(T env) {}
+ }
+
+ private boolean execOperation(
+ final CoprocessorOperation<PhoenixMetaDataCoprocessorHost.PhoenixMetaDataControllerEnvironment> ctx)
+ throws IOException {
+ if (ctx == null) return false;
+ boolean bypass = false;
+ for (PhoenixMetaDataControllerEnvironment env : coprocessors) {
+ if (env.getInstance() instanceof MetaDataEndpointObserver) {
+ ctx.prepare(env);
+ Thread currentThread = Thread.currentThread();
+ ClassLoader cl = currentThread.getContextClassLoader();
+ try {
+ currentThread.setContextClassLoader(env.getClassLoader());
+ ctx.call((MetaDataEndpointObserver)env.getInstance(), ctx);
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env, e);
+ } finally {
+ currentThread.setContextClassLoader(cl);
+ }
+ bypass |= ctx.shouldBypass();
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ ctx.postEnvCall(env);
+ }
+ return bypass;
+ }
+
+ @Override
+ protected void handleCoprocessorThrowable(final CoprocessorEnvironment env, final Throwable e) throws IOException {
+ if (e instanceof IOException) {
+ if (e.getCause() instanceof DoNotRetryIOException) { throw (IOException)e.getCause(); }
+ }
+ super.handleCoprocessorThrowable(env, e);
+ }
+
+ /**
+ * Encapsulation of the environment of each coprocessor
+ */
+ static class PhoenixMetaDataControllerEnvironment extends CoprocessorHost.Environment
+ implements RegionCoprocessorEnvironment {
+
+ private RegionCoprocessorEnvironment env;
+
+ public PhoenixMetaDataControllerEnvironment(RegionCoprocessorEnvironment env, Coprocessor instance,
+ int priority, int sequence, Configuration conf) {
+ super(instance, priority, sequence, conf);
+ this.env = env;
+ }
+
+ @Override
+ public RegionServerServices getRegionServerServices() {
+ return env.getRegionServerServices();
+ }
+
+ public RegionCoprocessorHost getCoprocessorHost() {
+ return env.getRegion().getCoprocessorHost();
+ }
+
+ @Override
+ public Region getRegion() {
+ return env.getRegion();
+ }
+
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return env.getRegionInfo();
+ }
+
+ @Override
+ public ConcurrentMap<String, Object> getSharedData() {
+ return env.getSharedData();
+ }
+ }
+
+ @Override
+ public PhoenixMetaDataControllerEnvironment createEnvironment(Class<?> implClass, Coprocessor instance,
+ int priority, int sequence, Configuration conf) {
+ return new PhoenixMetaDataControllerEnvironment(env, instance, priority, sequence, conf);
+ }
+
+ public void preGetTable(final String tenantId, final String tableName, final TableName physicalTableName)
+ throws IOException {
+ execOperation(new CoprocessorOperation<PhoenixMetaDataControllerEnvironment>() {
+ @Override
+ public void call(MetaDataEndpointObserver observer,
+ ObserverContext<PhoenixMetaDataControllerEnvironment> ctx) throws IOException {
+ observer.preGetTable(ctx, tenantId, tableName, physicalTableName);
+ }
+ });
+ }
+
+ public void preCreateTable(final String tenantId, final String tableName, final TableName physicalTableName,
+ final TableName parentPhysicalTableName, final PTableType tableType, final Set<byte[]> familySet, final Set<TableName> indexes)
+ throws IOException {
+ execOperation(new CoprocessorOperation<PhoenixMetaDataControllerEnvironment>() {
+ @Override
+ public void call(MetaDataEndpointObserver observer,
+ ObserverContext<PhoenixMetaDataControllerEnvironment> ctx) throws IOException {
+ observer.preCreateTable(ctx, tenantId, tableName, physicalTableName, parentPhysicalTableName, tableType,
+ familySet, indexes);
+ }
+ });
+ }
+
+ public void preDropTable(final String tenantId, final String tableName, final TableName physicalTableName,
+ final TableName parentPhysicalTableName, final PTableType tableType, final List<PTable> indexes) throws IOException {
+ execOperation(new CoprocessorOperation<PhoenixMetaDataControllerEnvironment>() {
+ @Override
+ public void call(MetaDataEndpointObserver observer,
+ ObserverContext<PhoenixMetaDataControllerEnvironment> ctx) throws IOException {
+ observer.preDropTable(ctx, tenantId, tableName, physicalTableName, parentPhysicalTableName, tableType, indexes);
+ }
+ });
+ }
+
+ public void preAlterTable(final String tenantId, final String tableName, final TableName physicalTableName,
+ final TableName parentPhysicalTableName, final PTableType type) throws IOException {
+ execOperation(new CoprocessorOperation<PhoenixMetaDataControllerEnvironment>() {
+ @Override
+ public void call(MetaDataEndpointObserver observer,
+ ObserverContext<PhoenixMetaDataControllerEnvironment> ctx) throws IOException {
+ observer.preAlterTable(ctx, tenantId, tableName, physicalTableName, parentPhysicalTableName, type);
+ }
+ });
+ }
+
+ public void preGetSchema(final String schemaName) throws IOException {
+ execOperation(new CoprocessorOperation<PhoenixMetaDataControllerEnvironment>() {
+ @Override
+ public void call(MetaDataEndpointObserver observer,
+ ObserverContext<PhoenixMetaDataControllerEnvironment> ctx) throws IOException {
+ observer.preGetSchema(ctx, schemaName);
+ }
+ });
+ }
+
+ public void preCreateSchema(final String schemaName) throws IOException {
+
+ execOperation(new CoprocessorOperation<PhoenixMetaDataControllerEnvironment>() {
+ @Override
+ public void call(MetaDataEndpointObserver observer,
+ ObserverContext<PhoenixMetaDataControllerEnvironment> ctx) throws IOException {
+ observer.preCreateSchema(ctx, schemaName);
+ }
+ });
+ }
+
+ public void preDropSchema(final String schemaName) throws IOException {
+ execOperation(new CoprocessorOperation<PhoenixMetaDataControllerEnvironment>() {
+ @Override
+ public void call(MetaDataEndpointObserver observer,
+ ObserverContext<PhoenixMetaDataControllerEnvironment> ctx) throws IOException {
+ observer.preDropSchema(ctx, schemaName);
+ }
+ });
+ }
+
+ public void preIndexUpdate(final String tenantId, final String indexName, final TableName physicalTableName,
+ final TableName parentPhysicalTableName, final PIndexState newState) throws IOException {
+ execOperation(new CoprocessorOperation<PhoenixMetaDataControllerEnvironment>() {
+ @Override
+ public void call(MetaDataEndpointObserver observer,
+ ObserverContext<PhoenixMetaDataControllerEnvironment> ctx) throws IOException {
+ observer.preIndexUpdate(ctx, tenantId, indexName, physicalTableName, parentPhysicalTableName, newState);
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index e51fd9f..2301c32 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -229,6 +229,7 @@ public enum SQLExceptionCode {
return new TableAlreadyExistsException(info.getSchemaName(), info.getTableName());
}
}),
+ TABLES_NOT_IN_SYNC(1140, "42M05", "Tables not in sync for some properties."),
// Syntax error
TYPE_NOT_SUPPORTED_FOR_OPERATOR(1014, "42Y01", "The operator does not support the operand type."),
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 4c29abe..369769e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -59,6 +59,7 @@ import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.SerialIterators;
import org.apache.phoenix.iterate.SpoolingResultIterator;
import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.KeyRange;
@@ -67,6 +68,7 @@ import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.CostUtil;
import org.apache.phoenix.util.ScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,7 +114,33 @@ public class AggregatePlan extends BaseQueryPlan {
public Expression getHaving() {
return having;
}
-
+
+ @Override
+ public Cost getCost() {
+ Long byteCount = null;
+ try {
+ byteCount = getEstimatedBytesToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+
+ if (byteCount == null) {
+ return Cost.UNKNOWN;
+ }
+
+ int parallelLevel = CostUtil.estimateParallelLevel(
+ true, context.getConnection().getQueryServices());
+ Cost cost = CostUtil.estimateAggregateCost(byteCount,
+ groupBy, aggregators.getEstimatedByteSize(), parallelLevel);
+ if (!orderBy.getOrderByExpressions().isEmpty()) {
+ double outputBytes = CostUtil.estimateAggregateOutputBytes(
+ byteCount, groupBy, aggregators.getEstimatedByteSize());
+ Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel);
+ cost = cost.plus(orderByCost);
+ }
+ return cost;
+ }
+
@Override
public List<KeyRange> getSplits() {
if (splits == null)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index c1ddd44..31f67b7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -63,6 +63,8 @@ import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.parse.ParseNodeFactory;
import org.apache.phoenix.parse.TableName;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PName;
@@ -500,13 +502,24 @@ public abstract class BaseQueryPlan implements QueryPlan {
if (context.getScanRanges() == ScanRanges.NOTHING) {
return new ExplainPlan(Collections.singletonList("DEGENERATE SCAN OVER " + getTableRef().getTable().getName().getString()));
}
-
+
+ // If cost-based optimizer is enabled, we need to initialize a dummy iterator to
+ // get the stats for computing costs.
+ boolean costBased =
+ context.getConnection().getQueryServices().getConfiguration().getBoolean(
+ QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
+ if (costBased) {
+ ResultIterator iterator = iterator();
+ iterator.close();
+ }
// Optimize here when getting explain plan, as queries don't get optimized until after compilation
QueryPlan plan = context.getConnection().getQueryServices().getOptimizer().optimize(context.getStatement(), this);
ExplainPlan exp = plan instanceof BaseQueryPlan ? new ExplainPlan(getPlanSteps(plan.iterator())) : plan.getExplainPlan();
- this.estimatedRows = plan.getEstimatedRowsToScan();
- this.estimatedSize = plan.getEstimatedBytesToScan();
- this.estimateInfoTimestamp = plan.getEstimateInfoTimestamp();
+ if (!costBased) { // do not override estimates if they are used for cost calculation.
+ this.estimatedRows = plan.getEstimatedRowsToScan();
+ this.estimatedSize = plan.getEstimatedBytesToScan();
+ this.estimateInfoTimestamp = plan.getEstimateInfoTimestamp();
+ }
return exp;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index 8ef1f8d..a15ab35 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -56,12 +56,14 @@ import org.apache.phoenix.iterate.PeekingResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.CostUtil;
import org.apache.phoenix.util.TupleUtil;
import com.google.common.collect.Lists;
@@ -87,6 +89,32 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
}
@Override
+ public Cost getCost() {
+ Long byteCount = null;
+ try {
+ byteCount = getEstimatedBytesToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+
+ if (byteCount == null) {
+ return Cost.UNKNOWN;
+ }
+
+ int parallelLevel = CostUtil.estimateParallelLevel(
+ false, context.getConnection().getQueryServices());
+ Cost cost = CostUtil.estimateAggregateCost(byteCount,
+ groupBy, clientAggregators.getEstimatedByteSize(), parallelLevel);
+ if (!orderBy.getOrderByExpressions().isEmpty()) {
+ double outputBytes = CostUtil.estimateAggregateOutputBytes(
+ byteCount, groupBy, clientAggregators.getEstimatedByteSize());
+ Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel);
+ cost = cost.plus(orderByCost);
+ }
+ return super.getCost().plus(cost);
+ }
+
+ @Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
ResultIterator iterator = delegate.iterator(scanGrouper, scan);
if (where != null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
index 6bbc545..5799990 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
@@ -34,10 +34,12 @@ import org.apache.phoenix.iterate.OrderedResultIterator;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.CostUtil;
import com.google.common.collect.Lists;
@@ -50,6 +52,29 @@ public class ClientScanPlan extends ClientProcessingPlan {
}
@Override
+ public Cost getCost() {
+ Long byteCount = null;
+ try {
+ byteCount = getEstimatedBytesToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+
+ if (byteCount == null) {
+ return Cost.UNKNOWN;
+ }
+
+ Cost cost = new Cost(0, 0, byteCount);
+ int parallelLevel = CostUtil.estimateParallelLevel(
+ false, context.getConnection().getQueryServices());
+ if (!orderBy.getOrderByExpressions().isEmpty()) {
+ Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel);
+ cost = cost.plus(orderByCost);
+ }
+ return super.getCost().plus(cost);
+ }
+
+ @Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
ResultIterator iterator = delegate.iterator(scanGrouper, scan);
if (where != null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
index ee81c36..270ad3d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
@@ -30,6 +30,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.JoinTableNode.JoinType;
import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.PColumn;
@@ -200,4 +201,28 @@ public class CorrelatePlan extends DelegateQueryPlan {
return null;
}
+ @Override
+ public Cost getCost() {
+ Long lhsByteCount = null;
+ try {
+ lhsByteCount = delegate.getEstimatedBytesToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+ Long rhsRowCount = null;
+ try {
+ rhsRowCount = rhs.getEstimatedRowsToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+
+ if (lhsByteCount == null || rhsRowCount == null) {
+ return Cost.UNKNOWN;
+ }
+
+ Cost cost = new Cost(0, 0, lhsByteCount * rhsRowCount);
+ Cost lhsCost = delegate.getCost();
+ return cost.plus(lhsCost).plus(rhs.getCost());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
index 3c62c5b..3da06db 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.TableRef;
@@ -59,6 +60,11 @@ public abstract class DelegateQueryPlan implements QueryPlan {
}
@Override
+ public Cost getCost() {
+ return delegate.getCost();
+ }
+
+ @Override
public TableRef getTableRef() {
return delegate.getTableRef();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 2b90dcb..2d2ff4e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -62,6 +62,7 @@ import org.apache.phoenix.job.JobManager.JobCallable;
import org.apache.phoenix.join.HashCacheClient;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.SQLParser;
@@ -290,6 +291,34 @@ public class HashJoinPlan extends DelegateQueryPlan {
return statement;
}
+ @Override
+ public Cost getCost() {
+ Long byteCount = null;
+ try {
+ byteCount = getEstimatedBytesToScan();
+ } catch (SQLException e) {
+ // ignored.
+ }
+
+ if (byteCount == null) {
+ return Cost.UNKNOWN;
+ }
+
+ Cost cost = new Cost(0, 0, byteCount);
+ Cost lhsCost = delegate.getCost();
+ if (keyRangeExpressions != null) {
+ // The selectivity of the dynamic rowkey filter.
+ // TODO replace the constant with an estimate value.
+ double selectivity = 0.01;
+ lhsCost = lhsCost.multiplyBy(selectivity);
+ }
+ Cost rhsCost = Cost.ZERO;
+ for (SubPlan subPlan : subPlans) {
+ rhsCost = rhsCost.plus(subPlan.getInnerPlan().getCost());
+ }
+ return cost.plus(lhsCost).plus(rhsCost);
+ }
+
protected interface SubPlan {
public ServerCache execute(HashJoinPlan parent) throws SQLException;
public void postProcess(ServerCache result, HashJoinPlan parent) throws SQLException;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index 86f59c5..1d1332d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -35,6 +35,7 @@ import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.TableRef;
@@ -60,6 +61,11 @@ public class LiteralResultIterationPlan extends BaseQueryPlan {
}
@Override
+ public Cost getCost() {
+ return Cost.ZERO;
+ }
+
+ @Override
public List<KeyRange> getSplits() {
return Collections.emptyList();
}