You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/04/08 03:29:22 UTC
[3/7] hbase git commit: HBASE-13205 [branch-1] Backport HBASE-11598
Add simple rpc throttling (Ashish Singhi)
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-protocol/src/main/protobuf/HBase.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/HBase.proto b/hbase-protocol/src/main/protobuf/HBase.proto
index ca09777..f78163e 100644
--- a/hbase-protocol/src/main/protobuf/HBase.proto
+++ b/hbase-protocol/src/main/protobuf/HBase.proto
@@ -180,6 +180,16 @@ message ProcedureDescription {
message EmptyMsg {
}
+enum TimeUnit {
+ NANOSECONDS = 1;
+ MICROSECONDS = 2;
+ MILLISECONDS = 3;
+ SECONDS = 4;
+ MINUTES = 5;
+ HOURS = 6;
+ DAYS = 7;
+}
+
message LongMsg {
required int64 long_msg = 1;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-protocol/src/main/protobuf/Master.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/Master.proto b/hbase-protocol/src/main/protobuf/Master.proto
index 462bb6a..e2814e7 100644
--- a/hbase-protocol/src/main/protobuf/Master.proto
+++ b/hbase-protocol/src/main/protobuf/Master.proto
@@ -28,6 +28,7 @@ option optimize_for = SPEED;
import "HBase.proto";
import "Client.proto";
import "ClusterStatus.proto";
+import "Quota.proto";
/* Column-level protobufs */
@@ -371,6 +372,20 @@ message IsProcedureDoneResponse {
optional ProcedureDescription snapshot = 2;
}
+message SetQuotaRequest {
+ optional string user_name = 1;
+ optional string user_group = 2;
+ optional string namespace = 3;
+ optional TableName table_name = 4;
+
+ optional bool remove_all = 5;
+ optional bool bypass_globals = 6;
+ optional ThrottleRequest throttle = 7;
+}
+
+message SetQuotaResponse {
+}
+
message MajorCompactionTimestampRequest {
required TableName table_name = 1;
}
@@ -597,6 +612,9 @@ service MasterService {
rpc ListTableNamesByNamespace(ListTableNamesByNamespaceRequest)
returns(ListTableNamesByNamespaceResponse);
+ /** Apply the new quota settings */
+ rpc SetQuota(SetQuotaRequest) returns(SetQuotaResponse);
+
/** Returns the timestamp of the last major compaction */
rpc getLastMajorCompactionTimestamp(MajorCompactionTimestampRequest)
returns(MajorCompactionTimestampResponse);
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-protocol/src/main/protobuf/Quota.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/Quota.proto b/hbase-protocol/src/main/protobuf/Quota.proto
new file mode 100644
index 0000000..6ef15fe
--- /dev/null
+++ b/hbase-protocol/src/main/protobuf/Quota.proto
@@ -0,0 +1,73 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+option java_package = "org.apache.hadoop.hbase.protobuf.generated";
+option java_outer_classname = "QuotaProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "HBase.proto";
+
+enum QuotaScope {
+ CLUSTER = 1;
+ MACHINE = 2;
+}
+
+message TimedQuota {
+ required TimeUnit time_unit = 1;
+ optional uint64 soft_limit = 2;
+ optional float share = 3;
+ optional QuotaScope scope = 4 [default = MACHINE];
+}
+
+enum ThrottleType {
+ REQUEST_NUMBER = 1;
+ REQUEST_SIZE = 2;
+ WRITE_NUMBER = 3;
+ WRITE_SIZE = 4;
+ READ_NUMBER = 5;
+ READ_SIZE = 6;
+}
+
+message Throttle {
+ optional TimedQuota req_num = 1;
+ optional TimedQuota req_size = 2;
+
+ optional TimedQuota write_num = 3;
+ optional TimedQuota write_size = 4;
+
+ optional TimedQuota read_num = 5;
+ optional TimedQuota read_size = 6;
+}
+
+message ThrottleRequest {
+ optional ThrottleType type = 1;
+ optional TimedQuota timed_quota = 2;
+}
+
+enum QuotaType {
+ THROTTLE = 1;
+}
+
+message Quotas {
+ optional bool bypass_globals = 1 [default = false];
+ optional Throttle throttle = 2;
+}
+
+message QuotaUsage {
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java
index 98c0563..49f21d5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterAndRegionObserver.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
import java.io.IOException;
import java.util.List;
@@ -467,4 +468,54 @@ public abstract class BaseMasterAndRegionObserver extends BaseRegionObserver
public void postTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableName tableName) throws IOException {
}
+
+ @Override
+ public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final TableName tableName, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final TableName tableName, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final String namespace, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final String namespace, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final TableName tableName, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void postSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final TableName tableName, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String namespace, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void postSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String namespace, final Quotas quotas) throws IOException {
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java
index 4748a1b..99a8552 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
import java.io.IOException;
import java.util.List;
@@ -462,4 +463,53 @@ public class BaseMasterObserver implements MasterObserver {
TableName tableName) throws IOException {
}
+ @Override
+ public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final TableName tableName, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final TableName tableName, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final String namespace, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final String namespace, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final TableName tableName, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void postSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final TableName tableName, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String namespace, final Quotas quotas) throws IOException {
+ }
+
+ @Override
+ public void postSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String namespace, final Quotas quotas) throws IOException {
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
index 2d99754..5dc50da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
/**
* Defines coprocessor hooks for interacting with operations on the
@@ -842,4 +843,108 @@ public interface MasterObserver extends Coprocessor {
*/
void postTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final TableName tableName) throws IOException;
+
+ /**
+ * Called before the quota for the user is stored.
+ * @param ctx the environment to interact with the framework and master
+ * @param userName the name of user
+ * @param quotas the quota settings
+ * @throws IOException
+ */
+ void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final Quotas quotas) throws IOException;
+
+ /**
+ * Called after the quota for the user is stored.
+ * @param ctx the environment to interact with the framework and master
+ * @param userName the name of user
+ * @param quotas the quota settings
+ * @throws IOException
+ */
+ void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final Quotas quotas) throws IOException;
+
+ /**
+ * Called before the quota for the user on the specified table is stored.
+ * @param ctx the environment to interact with the framework and master
+ * @param userName the name of user
+ * @param tableName the name of the table
+ * @param quotas the quota settings
+ * @throws IOException
+ */
+ void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final TableName tableName, final Quotas quotas) throws IOException;
+
+ /**
+ * Called after the quota for the user on the specified table is stored.
+ * @param ctx the environment to interact with the framework and master
+ * @param userName the name of user
+ * @param tableName the name of the table
+ * @param quotas the quota settings
+ * @throws IOException
+ */
+ void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final TableName tableName, final Quotas quotas) throws IOException;
+
+ /**
+ * Called before the quota for the user on the specified namespace is stored.
+ * @param ctx the environment to interact with the framework and master
+ * @param userName the name of user
+ * @param namespace the name of the namespace
+ * @param quotas the quota settings
+ * @throws IOException
+ */
+ void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final String namespace, final Quotas quotas) throws IOException;
+
+ /**
+ * Called after the quota for the user on the specified namespace is stored.
+ * @param ctx the environment to interact with the framework and master
+ * @param userName the name of user
+ * @param namespace the name of the namespace
+ * @param quotas the quota settings
+ * @throws IOException
+ */
+ void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String userName, final String namespace, final Quotas quotas) throws IOException;
+
+ /**
+ * Called before the quota for the table is stored.
+ * @param ctx the environment to interact with the framework and master
+ * @param tableName the name of the table
+ * @param quotas the quota settings
+ * @throws IOException
+ */
+ void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final TableName tableName, final Quotas quotas) throws IOException;
+
+ /**
+ * Called after the quota for the table is stored.
+ * @param ctx the environment to interact with the framework and master
+ * @param tableName the name of the table
+ * @param quotas the quota settings
+ * @throws IOException
+ */
+ void postSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final TableName tableName, final Quotas quotas) throws IOException;
+
+ /**
+ * Called before the quota for the namespace is stored.
+ * @param ctx the environment to interact with the framework and master
+ * @param namespace the name of the namespace
+ * @param quotas the quota settings
+ * @throws IOException
+ */
+ void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String namespace, final Quotas quotas) throws IOException;
+
+ /**
+ * Called after the quota for the namespace is stored.
+ * @param ctx the environment to interact with the framework and master
+ * @param namespace the name of the namespace
+ * @param quotas the quota settings
+ * @throws IOException
+ */
+ void postSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+ final String namespace, final Quotas quotas) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index ad49036..891a999 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -2477,6 +2477,7 @@ public class RpcServer implements RpcServerInterface {
}
}
+ @Override
public RpcScheduler getScheduler() {
return scheduler;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
index b133ed6..013d256 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
@@ -73,4 +73,6 @@ public interface RpcServerInterface {
*/
@VisibleForTesting
void refreshAuthManager(PolicyProvider pp);
+
+ RpcScheduler getScheduler();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 0b6200e..a7006bc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -109,6 +109,7 @@ import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
@@ -290,6 +291,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
SnapshotManager snapshotManager;
// monitor for distributed procedures
MasterProcedureManagerHost mpmHost;
+
+ private MasterQuotaManager quotaManager;
/** flag used in test cases in order to simulate RS failures during master initialization */
private volatile boolean initializationBeforeMetaAssignment = false;
@@ -721,6 +724,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
status.setStatus("Starting namespace manager");
initNamespace();
+
+ status.setStatus("Starting quota manager");
+ initQuotaManager();
if (this.cpHost != null) {
try {
@@ -761,6 +767,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
zombieDetector.interrupt();
}
+ private void initQuotaManager() throws IOException {
+ quotaManager = new MasterQuotaManager(this);
+ quotaManager.start();
+ }
+
/**
* Create a {@link ServerManager} instance.
* @param master
@@ -1063,6 +1074,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
// Clean up and close up shop
if (this.logCleaner != null) this.logCleaner.cancel(true);
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
+ if (this.quotaManager != null) this.quotaManager.stop();
if (this.activeMasterManager != null) this.activeMasterManager.stop();
if (this.serverManager != null) this.serverManager.stop();
if (this.assignmentManager != null) this.assignmentManager.stop();
@@ -1861,6 +1873,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
public MasterCoprocessorHost getMasterCoprocessorHost() {
return cpHost;
}
+
+ @Override
+ public MasterQuotaManager getMasterQuotaManager() {
+ return quotaManager;
+ }
@Override
public ServerName getServerName() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index 0fffdab..3c92f72 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.coprocessor.*;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
import java.io.IOException;
import java.util.List;
@@ -930,6 +931,111 @@ public class MasterCoprocessorHost
}
});
}
+
+ public void preSetUserQuota(final String user, final Quotas quotas) throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preSetUserQuota(ctx, user, quotas);
+ }
+ });
+ }
+
+ public void postSetUserQuota(final String user, final Quotas quotas) throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postSetUserQuota(ctx, user, quotas);
+ }
+ });
+ }
+
+ public void preSetUserQuota(final String user, final TableName table, final Quotas quotas)
+ throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preSetUserQuota(ctx, user, table, quotas);
+ }
+ });
+ }
+
+ public void postSetUserQuota(final String user, final TableName table, final Quotas quotas)
+ throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postSetUserQuota(ctx, user, table, quotas);
+ }
+ });
+ }
+
+ public void preSetUserQuota(final String user, final String namespace, final Quotas quotas)
+ throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preSetUserQuota(ctx, user, namespace, quotas);
+ }
+ });
+ }
+
+ public void postSetUserQuota(final String user, final String namespace, final Quotas quotas)
+ throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postSetUserQuota(ctx, user, namespace, quotas);
+ }
+ });
+ }
+
+ public void preSetTableQuota(final TableName table, final Quotas quotas) throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preSetTableQuota(ctx, table, quotas);
+ }
+ });
+ }
+
+ public void postSetTableQuota(final TableName table, final Quotas quotas) throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postSetTableQuota(ctx, table, quotas);
+ }
+ });
+ }
+
+ public void preSetNamespaceQuota(final String namespace, final Quotas quotas) throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preSetNamespaceQuota(ctx, namespace, quotas);
+ }
+ });
+ }
+
+ public void postSetNamespaceQuota(final String namespace, final Quotas quotas)
+ throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ @Override
+ public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postSetNamespaceQuota(ctx, namespace, quotas);
+ }
+ });
+ }
private static abstract class CoprocessorOperation
extends ObserverContext<MasterCoprocessorEnvironment> {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 98f7507..e3e4099 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -129,6 +129,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanReq
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
@@ -1290,4 +1292,14 @@ public class MasterRpcServices extends RSRpcServices
response.setEnabled(master.isBalancerOn());
return response.build();
}
+
+ @Override
+ public SetQuotaResponse setQuota(RpcController c, SetQuotaRequest req) throws ServiceException {
+ try {
+ master.checkInitialized();
+ return master.getMasterQuotaManager().setQuota(req);
+ } catch (Exception e) {
+ throw new ServiceException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 458e53c..dbe7b68 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import com.google.protobuf.Service;
@@ -266,4 +267,10 @@ public interface MasterServices extends Server {
* @throws IOException
*/
public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException;
+
+ /**
+ * @return Master's instance of {@link MasterQuotaManager}
+ */
+ MasterQuotaManager getMasterQuotaManager();
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
new file mode 100644
index 0000000..654e8fa
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class DefaultOperationQuota implements OperationQuota {
+ private static final Log LOG = LogFactory.getLog(DefaultOperationQuota.class);
+
+ private final List<QuotaLimiter> limiters;
+ private long writeAvailable = 0;
+ private long readAvailable = 0;
+ private long writeConsumed = 0;
+ private long readConsumed = 0;
+
+ private AvgOperationSize avgOpSize = new AvgOperationSize();
+
+ public DefaultOperationQuota(final QuotaLimiter... limiters) {
+ this(Arrays.asList(limiters));
+ }
+
+ /**
+ * NOTE: The order matters. It should be something like [user, table, namespace, global]
+ */
+ public DefaultOperationQuota(final List<QuotaLimiter> limiters) {
+ this.limiters = limiters;
+ }
+
+ @Override
+ public void checkQuota(int numWrites, int numReads, int numScans) throws ThrottlingException {
+ writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
+ readConsumed = estimateConsume(OperationType.GET, numReads, 100);
+ readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
+
+ writeAvailable = Long.MAX_VALUE;
+ readAvailable = Long.MAX_VALUE;
+ for (final QuotaLimiter limiter : limiters) {
+ if (limiter.isBypass()) continue;
+
+ limiter.checkQuota(writeConsumed, readConsumed);
+ readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
+ writeAvailable = Math.min(writeAvailable, limiter.getWriteAvailable());
+ }
+
+ for (final QuotaLimiter limiter : limiters) {
+ limiter.grabQuota(writeConsumed, readConsumed);
+ }
+ }
+
+ @Override
+ public void close() {
+ // Calculate and set the average size of get, scan and mutate for the current operation
+ long getSize = avgOpSize.getAvgOperationSize(OperationType.GET);
+ long scanSize = avgOpSize.getAvgOperationSize(OperationType.SCAN);
+ long mutationSize = avgOpSize.getAvgOperationSize(OperationType.MUTATE);
+ for (final QuotaLimiter limiter : limiters) {
+ limiter.addOperationSize(OperationType.GET, getSize);
+ limiter.addOperationSize(OperationType.SCAN, scanSize);
+ limiter.addOperationSize(OperationType.MUTATE, mutationSize);
+ }
+
+ // Adjust the quota consumed for the specified operation
+ long writeDiff = avgOpSize.getOperationSize(OperationType.MUTATE) - writeConsumed;
+ long readDiff =
+ (avgOpSize.getOperationSize(OperationType.GET) + avgOpSize
+ .getOperationSize(OperationType.SCAN)) - readConsumed;
+ for (final QuotaLimiter limiter : limiters) {
+ if (writeDiff != 0) limiter.consumeWrite(writeDiff);
+ if (readDiff != 0) limiter.consumeRead(readDiff);
+ }
+ }
+
+ @Override
+ public long getReadAvailable() {
+ return readAvailable;
+ }
+
+ @Override
+ public long getWriteAvailable() {
+ return writeAvailable;
+ }
+
+ @Override
+ public void addGetResult(final Result result) {
+ avgOpSize.addGetResult(result);
+ }
+
+ @Override
+ public void addScanResult(final List<Result> results) {
+ avgOpSize.addScanResult(results);
+ }
+
+ @Override
+ public void addMutation(final Mutation mutation) {
+ avgOpSize.addMutation(mutation);
+ }
+
+ @Override
+ public long getAvgOperationSize(OperationType type) {
+ return avgOpSize.getAvgOperationSize(type);
+ }
+
+ private long estimateConsume(final OperationType type, int numReqs, long avgSize) {
+ if (numReqs > 0) {
+ for (final QuotaLimiter limiter : limiters) {
+ long size = limiter.getAvgOperationSize(type);
+ if (size > 0) {
+ avgSize = size;
+ break;
+ }
+ }
+ return avgSize * numReqs;
+ }
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
new file mode 100644
index 0000000..af7efb2
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
@@ -0,0 +1,441 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.HashSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaResponse;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.ThrottleRequest;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota;
+
+/**
+ * Master Quota Manager. It is responsible for initialize the quota table on the first-run and
+ * provide the admin operations to interact with the quota table. TODO: FUTURE: The master will be
+ * responsible to notify each RS of quota changes and it will do the "quota aggregation" when the
+ * QuotaScope is CLUSTER.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MasterQuotaManager {
+ private static final Log LOG = LogFactory.getLog(MasterQuotaManager.class);
+
+ private final MasterServices masterServices;
+ private NamedLock<String> namespaceLocks;
+ private NamedLock<TableName> tableLocks;
+ private NamedLock<String> userLocks;
+ private boolean enabled = false;
+
+ public MasterQuotaManager(final MasterServices masterServices) {
+ this.masterServices = masterServices;
+ }
+
+ public void start() throws IOException {
+ // If the user doesn't want the quota support skip all the initializations.
+ if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) {
+ LOG.info("Quota support disabled");
+ return;
+ }
+
+ // Create the quota table if missing
+ if (!MetaTableAccessor.tableExists(masterServices.getConnection(),
+ QuotaUtil.QUOTA_TABLE_NAME)) {
+ LOG.info("Quota table not found. Creating...");
+ createQuotaTable();
+ }
+
+ LOG.info("Initializing quota support");
+ namespaceLocks = new NamedLock<String>();
+ tableLocks = new NamedLock<TableName>();
+ userLocks = new NamedLock<String>();
+
+ enabled = true;
+ }
+
+ public void stop() {
+ }
+
+ public boolean isQuotaEnabled() {
+ return enabled;
+ }
+
+ /*
+ * ========================================================================== Admin operations to
+ * manage the quota table
+ */
+ public SetQuotaResponse setQuota(final SetQuotaRequest req) throws IOException,
+ InterruptedException {
+ checkQuotaSupport();
+
+ if (req.hasUserName()) {
+ userLocks.lock(req.getUserName());
+ try {
+ if (req.hasTableName()) {
+ setUserQuota(req.getUserName(), ProtobufUtil.toTableName(req.getTableName()), req);
+ } else if (req.hasNamespace()) {
+ setUserQuota(req.getUserName(), req.getNamespace(), req);
+ } else {
+ setUserQuota(req.getUserName(), req);
+ }
+ } finally {
+ userLocks.unlock(req.getUserName());
+ }
+ } else if (req.hasTableName()) {
+ TableName table = ProtobufUtil.toTableName(req.getTableName());
+ tableLocks.lock(table);
+ try {
+ setTableQuota(table, req);
+ } finally {
+ tableLocks.unlock(table);
+ }
+ } else if (req.hasNamespace()) {
+ namespaceLocks.lock(req.getNamespace());
+ try {
+ setNamespaceQuota(req.getNamespace(), req);
+ } finally {
+ namespaceLocks.unlock(req.getNamespace());
+ }
+ } else {
+ throw new DoNotRetryIOException(new UnsupportedOperationException(
+ "a user, a table or a namespace must be specified"));
+ }
+ return SetQuotaResponse.newBuilder().build();
+ }
+
+ public void setUserQuota(final String userName, final SetQuotaRequest req) throws IOException,
+ InterruptedException {
+ setQuota(req, new SetQuotaOperations() {
+ @Override
+ public Quotas fetch() throws IOException {
+ return QuotaUtil.getUserQuota(masterServices.getConnection(), userName);
+ }
+
+ @Override
+ public void update(final Quotas quotas) throws IOException {
+ QuotaUtil.addUserQuota(masterServices.getConnection(), userName, quotas);
+ }
+
+ @Override
+ public void delete() throws IOException {
+ QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName);
+ }
+
+ @Override
+ public void preApply(final Quotas quotas) throws IOException {
+ masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, quotas);
+ }
+
+ @Override
+ public void postApply(final Quotas quotas) throws IOException {
+ masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, quotas);
+ }
+ });
+ }
+
+ public void setUserQuota(final String userName, final TableName table, final SetQuotaRequest req)
+ throws IOException, InterruptedException {
+ setQuota(req, new SetQuotaOperations() {
+ @Override
+ public Quotas fetch() throws IOException {
+ return QuotaUtil.getUserQuota(masterServices.getConnection(), userName, table);
+ }
+
+ @Override
+ public void update(final Quotas quotas) throws IOException {
+ QuotaUtil.addUserQuota(masterServices.getConnection(), userName, table, quotas);
+ }
+
+ @Override
+ public void delete() throws IOException {
+ QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, table);
+ }
+
+ @Override
+ public void preApply(final Quotas quotas) throws IOException {
+ masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, table, quotas);
+ }
+
+ @Override
+ public void postApply(final Quotas quotas) throws IOException {
+ masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, table, quotas);
+ }
+ });
+ }
+
+ public void
+ setUserQuota(final String userName, final String namespace, final SetQuotaRequest req)
+ throws IOException, InterruptedException {
+ setQuota(req, new SetQuotaOperations() {
+ @Override
+ public Quotas fetch() throws IOException {
+ return QuotaUtil.getUserQuota(masterServices.getConnection(), userName, namespace);
+ }
+
+ @Override
+ public void update(final Quotas quotas) throws IOException {
+ QuotaUtil.addUserQuota(masterServices.getConnection(), userName, namespace, quotas);
+ }
+
+ @Override
+ public void delete() throws IOException {
+ QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, namespace);
+ }
+
+ @Override
+ public void preApply(final Quotas quotas) throws IOException {
+ masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, namespace, quotas);
+ }
+
+ @Override
+ public void postApply(final Quotas quotas) throws IOException {
+ masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, namespace, quotas);
+ }
+ });
+ }
+
+ public void setTableQuota(final TableName table, final SetQuotaRequest req) throws IOException,
+ InterruptedException {
+ setQuota(req, new SetQuotaOperations() {
+ @Override
+ public Quotas fetch() throws IOException {
+ return QuotaUtil.getTableQuota(masterServices.getConnection(), table);
+ }
+
+ @Override
+ public void update(final Quotas quotas) throws IOException {
+ QuotaUtil.addTableQuota(masterServices.getConnection(), table, quotas);
+ }
+
+ @Override
+ public void delete() throws IOException {
+ QuotaUtil.deleteTableQuota(masterServices.getConnection(), table);
+ }
+
+ @Override
+ public void preApply(final Quotas quotas) throws IOException {
+ masterServices.getMasterCoprocessorHost().preSetTableQuota(table, quotas);
+ }
+
+ @Override
+ public void postApply(final Quotas quotas) throws IOException {
+ masterServices.getMasterCoprocessorHost().postSetTableQuota(table, quotas);
+ }
+ });
+ }
+
+ public void setNamespaceQuota(final String namespace, final SetQuotaRequest req)
+ throws IOException, InterruptedException {
+ setQuota(req, new SetQuotaOperations() {
+ @Override
+ public Quotas fetch() throws IOException {
+ return QuotaUtil.getNamespaceQuota(masterServices.getConnection(), namespace);
+ }
+
+ @Override
+ public void update(final Quotas quotas) throws IOException {
+ QuotaUtil.addNamespaceQuota(masterServices.getConnection(), namespace, quotas);
+ }
+
+ @Override
+ public void delete() throws IOException {
+ QuotaUtil.deleteNamespaceQuota(masterServices.getConnection(), namespace);
+ }
+
+ @Override
+ public void preApply(final Quotas quotas) throws IOException {
+ masterServices.getMasterCoprocessorHost().preSetNamespaceQuota(namespace, quotas);
+ }
+
+ @Override
+ public void postApply(final Quotas quotas) throws IOException {
+ masterServices.getMasterCoprocessorHost().postSetNamespaceQuota(namespace, quotas);
+ }
+ });
+ }
+
+ private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps)
+ throws IOException, InterruptedException {
+ if (req.hasRemoveAll() && req.getRemoveAll() == true) {
+ quotaOps.preApply(null);
+ quotaOps.delete();
+ quotaOps.postApply(null);
+ return;
+ }
+
+ // Apply quota changes
+ Quotas quotas = quotaOps.fetch();
+ quotaOps.preApply(quotas);
+
+ Quotas.Builder builder = (quotas != null) ? quotas.toBuilder() : Quotas.newBuilder();
+ if (req.hasThrottle()) applyThrottle(builder, req.getThrottle());
+ if (req.hasBypassGlobals()) applyBypassGlobals(builder, req.getBypassGlobals());
+
+ // Submit new changes
+ quotas = builder.build();
+ if (QuotaUtil.isEmptyQuota(quotas)) {
+ quotaOps.delete();
+ } else {
+ quotaOps.update(quotas);
+ }
+ quotaOps.postApply(quotas);
+ }
+
+ private static interface SetQuotaOperations {
+ Quotas fetch() throws IOException;
+
+ void delete() throws IOException;
+
+ void update(final Quotas quotas) throws IOException;
+
+ void preApply(final Quotas quotas) throws IOException;
+
+ void postApply(final Quotas quotas) throws IOException;
+ }
+
+ /*
+ * ========================================================================== Helpers to apply
+ * changes to the quotas
+ */
+ private void applyThrottle(final Quotas.Builder quotas, final ThrottleRequest req)
+ throws IOException {
+ Throttle.Builder throttle;
+
+ if (req.hasType() && (req.hasTimedQuota() || quotas.hasThrottle())) {
+ // Validate timed quota if present
+ if (req.hasTimedQuota()) {
+ validateTimedQuota(req.getTimedQuota());
+ }
+
+ // apply the new settings
+ throttle = quotas.hasThrottle() ? quotas.getThrottle().toBuilder() : Throttle.newBuilder();
+
+ switch (req.getType()) {
+ case REQUEST_NUMBER:
+ if (req.hasTimedQuota()) {
+ throttle.setReqNum(req.getTimedQuota());
+ } else {
+ throttle.clearReqNum();
+ }
+ break;
+ case REQUEST_SIZE:
+ if (req.hasTimedQuota()) {
+ throttle.setReqSize(req.getTimedQuota());
+ } else {
+ throttle.clearReqSize();
+ }
+ break;
+ case WRITE_NUMBER:
+ if (req.hasTimedQuota()) {
+ throttle.setWriteNum(req.getTimedQuota());
+ } else {
+ throttle.clearWriteNum();
+ }
+ break;
+ case WRITE_SIZE:
+ if (req.hasTimedQuota()) {
+ throttle.setWriteSize(req.getTimedQuota());
+ } else {
+ throttle.clearWriteSize();
+ }
+ break;
+ case READ_NUMBER:
+ if (req.hasTimedQuota()) {
+ throttle.setReadNum(req.getTimedQuota());
+ } else {
+ throttle.clearReqNum();
+ }
+ break;
+ case READ_SIZE:
+ if (req.hasTimedQuota()) {
+ throttle.setReadSize(req.getTimedQuota());
+ } else {
+ throttle.clearReadSize();
+ }
+ break;
+ default:
+ throw new RuntimeException("Invalid throttle type: " + req.getType());
+ }
+ quotas.setThrottle(throttle.build());
+ } else {
+ quotas.clearThrottle();
+ }
+ }
+
+ private void applyBypassGlobals(final Quotas.Builder quotas, boolean bypassGlobals) {
+ if (bypassGlobals) {
+ quotas.setBypassGlobals(bypassGlobals);
+ } else {
+ quotas.clearBypassGlobals();
+ }
+ }
+
+ private void validateTimedQuota(final TimedQuota timedQuota) throws IOException {
+ if (timedQuota.getSoftLimit() < 1) {
+ throw new DoNotRetryIOException(new UnsupportedOperationException(
+ "The throttle limit must be greater then 0, got " + timedQuota.getSoftLimit()));
+ }
+ }
+
+ /*
+ * ========================================================================== Helpers
+ */
+
+ private void checkQuotaSupport() throws IOException {
+ if (!enabled) {
+ throw new DoNotRetryIOException(new UnsupportedOperationException("quota support disabled"));
+ }
+ }
+
+ private void createQuotaTable() throws IOException {
+ HRegionInfo[] newRegions = new HRegionInfo[] { new HRegionInfo(QuotaUtil.QUOTA_TABLE_NAME) };
+
+ masterServices.getExecutorService()
+ .submit(
+ new CreateTableHandler(masterServices, masterServices.getMasterFileSystem(),
+ QuotaUtil.QUOTA_TABLE_DESC, masterServices.getConfiguration(), newRegions,
+ masterServices).prepare());
+ }
+
+ private static class NamedLock<T> {
+ private HashSet<T> locks = new HashSet<T>();
+
+ public void lock(final T name) throws InterruptedException {
+ synchronized (locks) {
+ while (locks.contains(name)) {
+ locks.wait();
+ }
+ locks.add(name);
+ }
+ }
+
+ public void unlock(final T name) {
+ synchronized (locks) {
+ locks.remove(name);
+ locks.notifyAll();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
new file mode 100644
index 0000000..2463ef7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+
+/**
+ * Noop operation quota returned when no quota is associated to the user/table
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+final class NoopOperationQuota implements OperationQuota {
+ private static OperationQuota instance = new NoopOperationQuota();
+
+ private NoopOperationQuota() {
+ // no-op
+ }
+
+ public static OperationQuota get() {
+ return instance;
+ }
+
+ @Override
+ public void checkQuota(int numWrites, int numReads, int numScans) throws ThrottlingException {
+ // no-op
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ }
+
+ @Override
+ public void addGetResult(final Result result) {
+ // no-op
+ }
+
+ @Override
+ public void addScanResult(final List<Result> results) {
+ // no-op
+ }
+
+ @Override
+ public void addMutation(final Mutation mutation) {
+ // no-op
+ }
+
+ @Override
+ public long getReadAvailable() {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public long getWriteAvailable() {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public long getAvgOperationSize(OperationType type) {
+ return -1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
new file mode 100644
index 0000000..699fd1a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
+
+/**
+ * Noop quota limiter returned when no limiter is associated to the user/table
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+final class NoopQuotaLimiter implements QuotaLimiter {
+ private static QuotaLimiter instance = new NoopQuotaLimiter();
+
+ private NoopQuotaLimiter() {
+ // no-op
+ }
+
+ @Override
+ public void checkQuota(long estimateWriteSize, long estimateReadSize) throws ThrottlingException {
+ // no-op
+ }
+
+ @Override
+ public void grabQuota(long writeSize, long readSize) {
+ // no-op
+ }
+
+ @Override
+ public void consumeWrite(final long size) {
+ // no-op
+ }
+
+ @Override
+ public void consumeRead(final long size) {
+ // no-op
+ }
+
+ @Override
+ public boolean isBypass() {
+ return true;
+ }
+
+ @Override
+ public long getWriteAvailable() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getReadAvailable() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addOperationSize(OperationType type, long size) {
+ }
+
+ @Override
+ public long getAvgOperationSize(OperationType type) {
+ return -1;
+ }
+
+ @Override
+ public String toString() {
+ return "NoopQuotaLimiter";
+ }
+
+ public static QuotaLimiter get() {
+ return instance;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
new file mode 100644
index 0000000..6010c13
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+
+/**
+ * Interface that allows to check the quota available for an operation.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface OperationQuota {
+ public enum OperationType {
+ MUTATE, GET, SCAN
+ }
+
+ /**
+ * Keeps track of the average data size of operations like get, scan, mutate
+ */
+ public class AvgOperationSize {
+ private final long[] sizeSum;
+ private final long[] count;
+
+ public AvgOperationSize() {
+ int size = OperationType.values().length;
+ sizeSum = new long[size];
+ count = new long[size];
+ for (int i = 0; i < size; ++i) {
+ sizeSum[i] = 0;
+ count[i] = 0;
+ }
+ }
+
+ public void addOperationSize(OperationType type, long size) {
+ if (size > 0) {
+ int index = type.ordinal();
+ sizeSum[index] += size;
+ count[index]++;
+ }
+ }
+
+ public long getAvgOperationSize(OperationType type) {
+ int index = type.ordinal();
+ return count[index] > 0 ? sizeSum[index] / count[index] : 0;
+ }
+
+ public long getOperationSize(OperationType type) {
+ return sizeSum[type.ordinal()];
+ }
+
+ public void addGetResult(final Result result) {
+ long size = QuotaUtil.calculateResultSize(result);
+ addOperationSize(OperationType.GET, size);
+ }
+
+ public void addScanResult(final List<Result> results) {
+ long size = QuotaUtil.calculateResultSize(results);
+ addOperationSize(OperationType.SCAN, size);
+ }
+
+ public void addMutation(final Mutation mutation) {
+ long size = QuotaUtil.calculateMutationSize(mutation);
+ addOperationSize(OperationType.MUTATE, size);
+ }
+ }
+
+ /**
+ * Checks if it is possible to execute the specified operation. The quota will be estimated based
+ * on the number of operations to perform and the average size accumulated during time.
+ * @param numWrites number of write operation that will be performed
+ * @param numReads number of small-read operation that will be performed
+ * @param numScans number of long-read operation that will be performed
+ * @throws ThrottlingException if the operation cannot be performed
+ */
+ void checkQuota(int numWrites, int numReads, int numScans) throws ThrottlingException;
+
+ /** Cleanup method on operation completion */
+ void close();
+
+ /**
+ * Add a get result. This will be used to calculate the exact quota and have a better short-read
+ * average size for the next time.
+ */
+ void addGetResult(Result result);
+
+ /**
+ * Add a scan result. This will be used to calculate the exact quota and have a better long-read
+ * average size for the next time.
+ */
+ void addScanResult(List<Result> results);
+
+ /**
+ * Add a mutation result. This will be used to calculate the exact quota and have a better
+ * mutation average size for the next time.
+ */
+ void addMutation(Mutation mutation);
+
+ /** @return the number of bytes available to read to avoid exceeding the quota */
+ long getReadAvailable();
+
+ /** @return the number of bytes available to write to avoid exceeding the quota */
+ long getWriteAvailable();
+
+ /** @return the average data size of the specified operation */
+ long getAvgOperationSize(OperationType type);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
new file mode 100644
index 0000000..39f1456
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Cache that keeps track of the quota settings for the users and tables that are interacting with
+ * it. To avoid blocking the operations if the requested quota is not in cache an "empty quota" will
+ * be returned and the request to fetch the quota information will be enqueued for the next refresh.
+ * TODO: At the moment the Cache has a Chore that will be triggered every 5min or on cache-miss
+ * events. Later the Quotas will be pushed using the notification system.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class QuotaCache implements Stoppable {
+ private static final Log LOG = LogFactory.getLog(QuotaCache.class);
+
+ public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
+ private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min
+ private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD
+
+ // for testing purpose only, enforce the cache to be always refreshed
+ private static boolean TEST_FORCE_REFRESH = false;
+
+ private final ConcurrentHashMap<String, QuotaState> namespaceQuotaCache =
+ new ConcurrentHashMap<String, QuotaState>();
+ private final ConcurrentHashMap<TableName, QuotaState> tableQuotaCache =
+ new ConcurrentHashMap<TableName, QuotaState>();
+ private final ConcurrentHashMap<String, UserQuotaState> userQuotaCache =
+ new ConcurrentHashMap<String, UserQuotaState>();
+ private final RegionServerServices rsServices;
+
+ private QuotaRefresherChore refreshChore;
+ private boolean stopped = true;
+
+ public QuotaCache(final RegionServerServices rsServices) {
+ this.rsServices = rsServices;
+ }
+
+ public void start() throws IOException {
+ stopped = false;
+
+ // TODO: This will be replaced once we have the notification bus ready.
+ Configuration conf = rsServices.getConfiguration();
+ int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD);
+ refreshChore = new QuotaRefresherChore(period, this);
+ rsServices.getChoreService().scheduleChore(refreshChore);
+ }
+
+ @Override
+ public void stop(final String why) {
+ stopped = true;
+ }
+
+ @Override
+ public boolean isStopped() {
+ return stopped;
+ }
+
+ /**
+ * Returns the limiter associated to the specified user/table.
+ * @param ugi the user to limit
+ * @param table the table to limit
+ * @return the limiter associated to the specified user/table
+ */
+ public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) {
+ if (table.isSystemTable()) {
+ return NoopQuotaLimiter.get();
+ }
+ return getUserQuotaState(ugi).getTableLimiter(table);
+ }
+
+ /**
+ * Returns the QuotaState associated to the specified user.
+ * @param ugi the user
+ * @return the quota info associated to specified user
+ */
+ public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
+ String key = ugi.getShortUserName();
+ UserQuotaState quotaInfo = userQuotaCache.get(key);
+ if (quotaInfo == null) {
+ quotaInfo = new UserQuotaState();
+ if (userQuotaCache.putIfAbsent(key, quotaInfo) == null) {
+ triggerCacheRefresh();
+ }
+ }
+ return quotaInfo;
+ }
+
+ /**
+ * Returns the limiter associated to the specified table.
+ * @param table the table to limit
+ * @return the limiter associated to the specified table
+ */
+ public QuotaLimiter getTableLimiter(final TableName table) {
+ return getQuotaState(this.tableQuotaCache, table).getGlobalLimiter();
+ }
+
+ /**
+ * Returns the limiter associated to the specified namespace.
+ * @param namespace the namespace to limit
+ * @return the limiter associated to the specified namespace
+ */
+ public QuotaLimiter getNamespaceLimiter(final String namespace) {
+ return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter();
+ }
+
+ /**
+ * Returns the QuotaState requested. If the quota info is not in cache an empty one will be
+ * returned and the quota request will be enqueued for the next cache refresh.
+ */
+ private <K> QuotaState
+ getQuotaState(final ConcurrentHashMap<K, QuotaState> quotasMap, final K key) {
+ QuotaState quotaInfo = quotasMap.get(key);
+ if (quotaInfo == null) {
+ quotaInfo = new QuotaState();
+ if (quotasMap.putIfAbsent(key, quotaInfo) == null) {
+ triggerCacheRefresh();
+ }
+ }
+ return quotaInfo;
+ }
+
+ @VisibleForTesting
+ void triggerCacheRefresh() {
+ refreshChore.triggerNow();
+ }
+
+ @VisibleForTesting
+ long getLastUpdate() {
+ return refreshChore.lastUpdate;
+ }
+
+ @VisibleForTesting
+ Map<String, QuotaState> getNamespaceQuotaCache() {
+ return namespaceQuotaCache;
+ }
+
+ @VisibleForTesting
+ Map<TableName, QuotaState> getTableQuotaCache() {
+ return tableQuotaCache;
+ }
+
+ @VisibleForTesting
+ Map<String, UserQuotaState> getUserQuotaCache() {
+ return userQuotaCache;
+ }
+
+ public static boolean isTEST_FORCE_REFRESH() {
+ return TEST_FORCE_REFRESH;
+ }
+
+ public static void setTEST_FORCE_REFRESH(boolean tEST_FORCE_REFRESH) {
+ TEST_FORCE_REFRESH = tEST_FORCE_REFRESH;
+ }
+
+ // TODO: Remove this once we have the notification bus
+ private class QuotaRefresherChore extends ScheduledChore {
+ private long lastUpdate = 0;
+
+ public QuotaRefresherChore(final int period, final Stoppable stoppable) {
+ super("QuotaRefresherChore", stoppable, period);
+ }
+
+ @Override
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "GC_UNRELATED_TYPES",
+ justification = "I do not understand why the complaints, it looks good to me -- FIX")
+ protected void chore() {
+ // Prefetch online tables/namespaces
+ for (TableName table : QuotaCache.this.rsServices.getOnlineTables()) {
+ if (table.isSystemTable()) continue;
+ if (!QuotaCache.this.tableQuotaCache.contains(table)) {
+ QuotaCache.this.tableQuotaCache.putIfAbsent(table, new QuotaState());
+ }
+ String ns = table.getNamespaceAsString();
+ if (!QuotaCache.this.namespaceQuotaCache.contains(ns)) {
+ QuotaCache.this.namespaceQuotaCache.putIfAbsent(ns, new QuotaState());
+ }
+ }
+
+ fetchNamespaceQuotaState();
+ fetchTableQuotaState();
+ fetchUserQuotaState();
+ lastUpdate = EnvironmentEdgeManager.currentTime();
+ }
+
+ private void fetchNamespaceQuotaState() {
+ fetch("namespace", QuotaCache.this.namespaceQuotaCache, new Fetcher<String, QuotaState>() {
+ @Override
+ public Get makeGet(final Map.Entry<String, QuotaState> entry) {
+ return QuotaUtil.makeGetForNamespaceQuotas(entry.getKey());
+ }
+
+ @Override
+ public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
+ return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets);
+ }
+ });
+ }
+
+ private void fetchTableQuotaState() {
+ fetch("table", QuotaCache.this.tableQuotaCache, new Fetcher<TableName, QuotaState>() {
+ @Override
+ public Get makeGet(final Map.Entry<TableName, QuotaState> entry) {
+ return QuotaUtil.makeGetForTableQuotas(entry.getKey());
+ }
+
+ @Override
+ public Map<TableName, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
+ return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets);
+ }
+ });
+ }
+
+ private void fetchUserQuotaState() {
+ final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet();
+ final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet();
+ fetch("user", QuotaCache.this.userQuotaCache, new Fetcher<String, UserQuotaState>() {
+ @Override
+ public Get makeGet(final Map.Entry<String, UserQuotaState> entry) {
+ return QuotaUtil.makeGetForUserQuotas(entry.getKey(), tables, namespaces);
+ }
+
+ @Override
+ public Map<String, UserQuotaState> fetchEntries(final List<Get> gets) throws IOException {
+ return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets);
+ }
+ });
+ }
+
+ private <K, V extends QuotaState> void fetch(final String type,
+ final ConcurrentHashMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
+ long now = EnvironmentEdgeManager.currentTime();
+ long refreshPeriod = getPeriod();
+ long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR;
+
+ // Find the quota entries to update
+ List<Get> gets = new ArrayList<Get>();
+ List<K> toRemove = new ArrayList<K>();
+ for (Map.Entry<K, V> entry : quotasMap.entrySet()) {
+ long lastUpdate = entry.getValue().getLastUpdate();
+ long lastQuery = entry.getValue().getLastQuery();
+ if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) {
+ toRemove.add(entry.getKey());
+ } else if (isTEST_FORCE_REFRESH() || (now - lastUpdate) >= refreshPeriod) {
+ gets.add(fetcher.makeGet(entry));
+ }
+ }
+
+ for (final K key : toRemove) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("evict " + type + " key=" + key);
+ }
+ quotasMap.remove(key);
+ }
+
+ // fetch and update the quota entries
+ if (!gets.isEmpty()) {
+ try {
+ for (Map.Entry<K, V> entry : fetcher.fetchEntries(gets).entrySet()) {
+ V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue());
+ if (quotaInfo != null) {
+ quotaInfo.update(entry.getValue());
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo);
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Unable to read " + type + " from quota table", e);
+ }
+ }
+ }
+ }
+
+ static interface Fetcher<Key, Value> {
+ Get makeGet(Map.Entry<Key, Value> entry);
+
+ Map<Key, Value> fetchEntries(List<Get> gets) throws IOException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
new file mode 100644
index 0000000..ffacbc0
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
+
+/**
+ * Internal interface used to interact with the user/table quota.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface QuotaLimiter {
+ /**
+ * Checks if it is possible to execute the specified operation.
+ *
+ * @param estimateWriteSize the write size that will be checked against the available quota
+ * @param estimateReadSize the read size that will be checked against the available quota
+ * @throws ThrottlingException thrown if not enough avialable resources to perform operation.
+ */
+ void checkQuota(long estimateWriteSize, long estimateReadSize)
+ throws ThrottlingException;
+
+ /**
+ * Removes the specified write and read amount from the quota.
+ * At this point the write and read amount will be an estimate,
+ * that will be later adjusted with a consumeWrite()/consumeRead() call.
+ *
+ * @param writeSize the write size that will be removed from the current quota
+ * @param readSize the read size that will be removed from the current quota
+ */
+ void grabQuota(long writeSize, long readSize);
+
+ /**
+ * Removes or add back some write amount to the quota.
+ * (called at the end of an operation in case the estimate quota was off)
+ */
+ void consumeWrite(long size);
+
+ /**
+ * Removes or add back some read amount to the quota.
+ * (called at the end of an operation in case the estimate quota was off)
+ */
+ void consumeRead(long size);
+
+ /** @return true if the limiter is a noop */
+ boolean isBypass();
+
+ /** @return the number of bytes available to read to avoid exceeding the quota */
+ long getReadAvailable();
+
+ /** @return the number of bytes available to write to avoid exceeding the quota */
+ long getWriteAvailable();
+
+ /**
+ * Add the average size of the specified operation type.
+ * The average will be used as estimate for the next operations.
+ */
+ void addOperationSize(OperationType type, long size);
+
+ /** @return the average data size of the specified operation */
+ long getAvgOperationSize(OperationType type);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiterFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiterFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiterFactory.java
new file mode 100644
index 0000000..e9bb304
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiterFactory.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class QuotaLimiterFactory {
+
+ private QuotaLimiterFactory() {
+ // utility class
+ }
+
+ public static QuotaLimiter fromThrottle(final Throttle throttle) {
+ return TimeBasedLimiter.fromThrottle(throttle);
+ }
+
+ public static QuotaLimiter update(final QuotaLimiter a, final QuotaLimiter b) {
+ if (a.getClass().equals(b.getClass()) && a instanceof TimeBasedLimiter) {
+ ((TimeBasedLimiter) a).update(((TimeBasedLimiter) b));
+ return a;
+ }
+ throw new UnsupportedOperationException("TODO not implemented yet");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c031d8de/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaState.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaState.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaState.java
new file mode 100644
index 0000000..c015b24
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaState.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * In-Memory state of table or namespace quotas
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class QuotaState {
+ private long lastUpdate = 0;
+ private long lastQuery = 0;
+ private QuotaLimiter globalLimiter = NoopQuotaLimiter.get();
+
+ public QuotaState() {
+ this(0);
+ }
+
+ public QuotaState(final long updateTs) {
+ lastUpdate = updateTs;
+ }
+
+ public synchronized long getLastUpdate() {
+ return lastUpdate;
+ }
+
+ public synchronized long getLastQuery() {
+ return lastQuery;
+ }
+
+ @Override
+ public synchronized String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("QuotaState(ts=" + getLastUpdate());
+ if (isBypass()) {
+ builder.append(" bypass");
+ } else {
+ if (globalLimiter != NoopQuotaLimiter.get()) {
+ // builder.append(" global-limiter");
+ builder.append(" " + globalLimiter);
+ }
+ }
+ builder.append(')');
+ return builder.toString();
+ }
+
+ /**
+ * @return true if there is no quota information associated to this object
+ */
+ public synchronized boolean isBypass() {
+ return globalLimiter == NoopQuotaLimiter.get();
+ }
+
+ /**
+ * Setup the global quota information. (This operation is part of the QuotaState setup)
+ */
+ public synchronized void setQuotas(final Quotas quotas) {
+ if (quotas.hasThrottle()) {
+ globalLimiter = QuotaLimiterFactory.fromThrottle(quotas.getThrottle());
+ } else {
+ globalLimiter = NoopQuotaLimiter.get();
+ }
+ }
+
+ /**
+ * Perform an update of the quota info based on the other quota info object. (This operation is
+ * executed by the QuotaCache)
+ */
+ public synchronized void update(final QuotaState other) {
+ if (globalLimiter == NoopQuotaLimiter.get()) {
+ globalLimiter = other.globalLimiter;
+ } else if (other.globalLimiter == NoopQuotaLimiter.get()) {
+ globalLimiter = NoopQuotaLimiter.get();
+ } else {
+ globalLimiter = QuotaLimiterFactory.update(globalLimiter, other.globalLimiter);
+ }
+ lastUpdate = other.lastUpdate;
+ }
+
+ /**
+ * Return the limiter associated with this quota.
+ * @return the quota limiter
+ */
+ public synchronized QuotaLimiter getGlobalLimiter() {
+ setLastQuery(EnvironmentEdgeManager.currentTime());
+ return globalLimiter;
+ }
+
+ /**
+ * Return the limiter associated with this quota without updating internal last query stats
+ * @return the quota limiter
+ */
+ synchronized QuotaLimiter getGlobalLimiterWithoutUpdatingLastQuery() {
+ return globalLimiter;
+ }
+
+ public synchronized void setLastQuery(long lastQuery) {
+ this.lastQuery = lastQuery;
+ }
+}
\ No newline at end of file