You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by dd...@apache.org on 2021/01/19 08:07:34 UTC

[zookeeper] branch master updated: ZOOKEEPER-3301: Enforce the quota limit

This is an automated email from the ASF dual-hosted git repository.

ddiederen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 190a227  ZOOKEEPER-3301: Enforce the quota limit
190a227 is described below

commit 190a227aa9d4655ebfe6ba9f5c2da426da8c5d98
Author: maoling <ma...@sina.com>
AuthorDate: Tue Jan 19 08:05:37 2021 +0000

    ZOOKEEPER-3301: Enforce the quota limit
    
    - Thanks for the original work from ZOOKEEPER-1383, ZOOKEEPER-2593, ZOOKEEPER-451, especially the work from ZOOKEEPER-1383 contributed by [Thawan Kooburat](https://issues.apache.org/jira/secure/ViewProfile.jspa?name=thawan)(I also sign off his name in the commit message) which also implemented the very valuable throughput quota.In the further, we will also do this.
    - `zookeeper.enforeQuota`. When enabled and the client exceeds the total bytes or children count hard quota under a znode, the server will reject the request and reply the client a `QuotaExceededException` by force. The default value is: false.
    - the checkQuota involves the `create()` and `setData()` api, not including the `delete()`.
    - When users set the quota which's less than the existing stats, we give a thoughtful warning info.
    - the following code in the StatsTrack has a bad augmentability:
    
      >             if (split.length != 2) {
      >                 throw new IllegalArgumentException("invalid string " + stats);
      >             }
    
       we do a trick to solve it for the expansibility, but we will get a little strange quota info(`Output quota for /c2 count=-1,bytes=-1=;byteHardLimit=-1;countHardLimit=5`) when using `listquota`. some UTs has covered it.
    - the logic about `checkQuota` should be put in the `PrepRequestProcessor`, other than `DataTree`.
      we will get the following two negative effects if putting `checkQuota` in the `DataTree`:
      - 1. When the write request has exceeded the quota, the corresponding transaction log will load into disk successfully.It's not good, although it has any data inconsistency issue, because when the server restart, so long as the transaction logs are applied in the same order, the exceeded nodes will not be applied into the state machine.
      - 2. the client will be blocking and waiting for the response, because when throwing `QuotaExceededException` in the the `DataTree`, the` rc.stat` will be `null` and `BinaryOutputArchive#writeRecord` will throw `NPE`.
      - 3. Overall, the pre-check about the write request should be done in the `PrepRequestProcessor`(at least before `SyncRequestProcessor`)(Look at an example from `checkACL()`)
    - more detail in the [ZOOKEEPER-3301](https://issues.apache.org/jira/browse/ZOOKEEPER-3301).
    - [Added in 2020-02-25] use `RateLogger` to replace `LOG` to avoid quota exceed logs flooding the disk
    - A `TODO` improvement is: only users have admin permission can write to `/zookeeper/quota`(just like `/zookeeper/config`) to avoid some users' misoperation
    
    Author: maoling <ma...@sina.com>
    
    Reviewers: Mate Szalay-Beko <sy...@apache.org>, Damien Diederen <dd...@apache.org>, Enrico Olivelli <eo...@apache.org>, Michael Han <ha...@apache.org>
    
    Closes #934 from maoling/ZOOKEEPER-3301
---
 .../src/main/resources/markdown/zookeeperAdmin.md  |   6 +
 .../src/main/resources/markdown/zookeeperCLI.md    |  37 +-
 .../src/main/resources/markdown/zookeeperQuotas.md |   3 +
 .../java/org/apache/zookeeper/KeeperException.java |  19 +
 .../src/main/java/org/apache/zookeeper/Quotas.java |  21 +-
 .../main/java/org/apache/zookeeper/StatsTrack.java | 171 +++++++--
 .../apache/zookeeper/cli/CliWrapperException.java  |   2 +
 .../org/apache/zookeeper/cli/DelQuotaCommand.java  | 100 ++---
 .../org/apache/zookeeper/cli/ListQuotaCommand.java |  37 +-
 .../org/apache/zookeeper/cli/SetQuotaCommand.java  | 210 ++++++++---
 .../org/apache/zookeeper/common/StringUtils.java   |  18 +
 .../java/org/apache/zookeeper/server/DataTree.java |  80 ++--
 .../zookeeper/server/PrepRequestProcessor.java     |   2 +
 .../apache/zookeeper/server/ZooKeeperServer.java   | 131 +++++++
 .../org/apache/zookeeper/server/DataTreeTest.java  |   4 +-
 .../apache/zookeeper/test/EnforceQuotaTest.java    |  87 +++++
 .../org/apache/zookeeper/test/QuorumQuotaTest.java |  12 +-
 .../java/org/apache/zookeeper/test/QuotasTest.java |  50 +++
 .../org/apache/zookeeper/test/StatsTrackTest.java  | 135 +++++++
 .../apache/zookeeper/test/ZooKeeperQuotaTest.java  | 406 +++++++++++++++++++--
 20 files changed, 1313 insertions(+), 218 deletions(-)

diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index 129cf19..82bcd65 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -978,6 +978,12 @@ property, when available, is noted below.
     Does not affect the limit defined by *flushDelay*.
     Default is 1000.
 
+* *enforceQuota* :
+    (Java system property: **zookeeper.enforceQuota**)
+    **New in 3.7.0:**
+    Enforce the quota check. When enabled and the client exceeds the total bytes or children count hard quota under a znode, the server will reject the request and reply the client a `QuotaExceededException` by force.
+    The default value is: false. Exploring [quota feature](http://zookeeper.apache.org/doc/current/zookeeperQuotas.html) for more details.
+
 * *requestThrottleLimit* :
     (Java system property: **zookeeper.request_throttle_max_requests**)
     **New in 3.6.0:**
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperCLI.md b/zookeeper-docs/src/main/resources/markdown/zookeeperCLI.md
index 205c3ba..8113e6c 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperCLI.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperCLI.md
@@ -42,7 +42,7 @@ ZooKeeper -server host:port cmd args
 	create [-s] [-e] [-c] [-t ttl] path [data] [acl]
 	delete [-v version] path
 	deleteall path
-	delquota [-n|-b] path
+	delquota [-n|-b|-N|-B] path
 	get [-s] [-w] path
 	getAcl [-s] path
 	getAllChildrenNumber path
@@ -57,7 +57,7 @@ ZooKeeper -server host:port cmd args
 	removewatches path [-c|-d|-a] [-l]
 	set [-s] [-v version] path data
 	setAcl [-s] [-v version] [-R] path acl
-	setquota -n|-b val path
+	setquota -n|-b|-N|-B val path
 	stat [-w] path
 	sync path
 	version
@@ -187,6 +187,11 @@ Delete the quota under a path
 [zkshell: 2] listquota /quota_test
 	absolute path is /zookeeper/quota/quota_test/zookeeper_limits
 	quota for /quota_test does not exist.
+[zkshell: 3] delquota -n /c1
+[zkshell: 4] delquota -N /c2
+[zkshell: 5] delquota -b /c3
+[zkshell: 6] delquota -B /c4
+
 ```
 ## get
 Get the data of the specific path
@@ -281,10 +286,10 @@ Showing the history about the recent 11 commands that you have executed
 Listing the quota of one path
 
 ```bash
-[zkshell: 1] listquota /quota_test
-	absolute path is /zookeeper/quota/quota_test/zookeeper_limits
-	Output quota for /quota_test count=2,bytes=-1
-	Output stat for /quota_test count=4,bytes=0
+[zkshell: 1] listquota /c1
+             absolute path is /zookeeper/quota/c1/zookeeper_limits
+             Output quota for /c1 count=-1,bytes=-1=;byteHardLimit=-1;countHardLimit=2
+             Output stat for /c1 count=4,bytes=0
 ```
 
 ## ls
@@ -497,6 +502,26 @@ Set the quota in one path.
 [zkshell: 23] set /brokers "I_love_zookeeper"
 # Notice:don't have a hard constraint,just log the warning info
 	WARN  [CommitProcWorkThread-7:DataTree@379] - Quota exceeded: /brokers bytes=4206 limit=5
+
+# -N count Hard quota
+[zkshell: 3] create /c1
+Created /c1
+[zkshell: 4] setquota -N 2 /c1
+[zkshell: 5] listquota /c1
+absolute path is /zookeeper/quota/c1/zookeeper_limits
+Output quota for /c1 count=-1,bytes=-1=;byteHardLimit=-1;countHardLimit=2
+Output stat for /c1 count=2,bytes=0
+[zkshell: 6] create /c1/ch-3
+Count Quota has exceeded : /c1/ch-3
+
+# -B byte Hard quota
+[zkshell: 3] create /c2
+[zkshell: 4] setquota -B 4 /c2
+[zkshell: 5] set /c2 "foo"
+[zkshell: 6] set /c2 "foo-bar"
+Bytes Quota has exceeded : /c2
+[zkshell: 7] get /c2
+foo
 ```
 
 ## stat
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperQuotas.md b/zookeeper-docs/src/main/resources/markdown/zookeeperQuotas.md
index 11226c3..72864c3 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperQuotas.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperQuotas.md
@@ -67,6 +67,9 @@ according to specific circumstances.
 
 - Users cannot set the quota on the path under **/zookeeper/quota**
 
+- The quota supports the soft and hard quota. The soft quota just logs the warning info when exceeding the quota, but the hard quota
+also throws a `QuotaExceededException`. When setting soft and hard quota on the same path, the hard quota has the priority.
+
 <a name="Listing+Quotas"></a>
 
 ### Listing Quotas
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java b/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java
index 9469d64..4590e0f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java
@@ -148,6 +148,8 @@ public abstract class KeeperException extends Exception {
             return new SessionClosedRequireAuthException();
         case REQUESTTIMEOUT:
             return new RequestTimeoutException();
+        case QUOTAEXCEEDED:
+            return new QuotaExceededException();
         case THROTTLEDOP:
             return new ThrottledOpException();
         case OK:
@@ -408,6 +410,8 @@ public abstract class KeeperException extends Exception {
          *  required  authentication scheme or configured but authentication failed
          *  (i.e. wrong credential used.). */
         SESSIONCLOSEDREQUIRESASLAUTH(-124),
+        /** Exceeded the quota that was set on the path.*/
+        QUOTAEXCEEDED(-125),
         /** Operation was throttled and not executed at all. This error code indicates that zookeeper server
          *  is under heavy load and can't process incoming requests at full speed; please retry with back off.
          */
@@ -502,6 +506,8 @@ public abstract class KeeperException extends Exception {
             return "Reconfig is disabled";
         case SESSIONCLOSEDREQUIRESASLAUTH:
             return "Session closed because client failed to authenticate";
+        case QUOTAEXCEEDED:
+            return "Quota has exceeded";
         case THROTTLEDOP:
             return "Op throttled due to high load";
         default:
@@ -950,6 +956,19 @@ public abstract class KeeperException extends Exception {
     }
 
     /**
+     * @see Code#QUOTAEXCEEDED
+     */
+    @InterfaceAudience.Public
+    public static class QuotaExceededException extends KeeperException {
+        public QuotaExceededException() {
+            super(Code.QUOTAEXCEEDED);
+        }
+        public QuotaExceededException(String path) {
+            super(Code.QUOTAEXCEEDED, path);
+        }
+    }
+
+    /**
      * @see Code#THROTTLEDOP
      */
     public static class ThrottledOpException extends KeeperException {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/Quotas.java b/zookeeper-server/src/main/java/org/apache/zookeeper/Quotas.java
index 031ff36..eb83cfe 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/Quotas.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/Quotas.java
@@ -48,9 +48,19 @@ public class Quotas {
      * return the quota path associated with this
      * prefix
      * @param path the actual path in zookeeper.
-     * @return the limit quota path
+     * @return the quota path
      */
     public static String quotaPath(String path) {
+        return quotaZookeeper + path;
+    }
+
+    /**
+     * return the limit quota path associated with this
+     * prefix
+     * @param path the actual path in zookeeper.
+     * @return the limit quota path
+     */
+    public static String limitPath(String path) {
         return quotaZookeeper + path + "/" + limitNode;
     }
 
@@ -64,4 +74,13 @@ public class Quotas {
         return quotaZookeeper + path + "/" + statNode;
     }
 
+    /**
+     * return the real path associated with this
+     * quotaPath.
+     * @param quotaPath the quotaPath which's started with /zookeeper/quota
+     * @return the real path associated with this quotaPath.
+     */
+    public static String trimQuotaPath(String quotaPath) {
+        return quotaPath.substring(quotaZookeeper.length());
+    }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/StatsTrack.java b/zookeeper-server/src/main/java/org/apache/zookeeper/StatsTrack.java
index 4bd88e3..aaea1d9 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/StatsTrack.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/StatsTrack.java
@@ -18,48 +18,70 @@
 
 package org.apache.zookeeper;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.zookeeper.common.StringUtils;
+
 /**
  * a class that represents the stats associated with quotas
  */
 public class StatsTrack {
 
-    private int count;
-    private long bytes;
-    private String countStr = "count";
-    private String byteStr = "bytes";
+    private static final String countStr = "count";
+    private static final String countHardLimitStr = "countHardLimit";
+
+    private static final String byteStr = "bytes";
+    private static final String byteHardLimitStr = "byteHardLimit";
+
+    private final Map<String, Long> stats = new HashMap<>();
+    private static final Pattern PAIRS_SEPARATOR = Pattern.compile("[,;]+");
 
     /**
      * a default constructor for
      * stats
      */
     public StatsTrack() {
-        this(null);
+        this("");
     }
+
     /**
-     * the stat string should be of the form count=int,bytes=long
-     * if stats is called with null the count and bytes are initialized
-     * to -1.
-     * @param stats the stat string to be intialized with
+     *
+     * @param stat the byte[] stat to be initialized with
      */
-    public StatsTrack(String stats) {
-        if (stats == null) {
-            stats = "count=-1,bytes=-1";
+    public StatsTrack(byte[] stat) {
+        this(new String(stat, StandardCharsets.UTF_8));
+    }
+
+    /**
+     * the stat string should be of the form key1str=long,key2str=long,..
+     * where either , or ; are valid separators
+     * uninitialized values are returned as -1
+     * @param stat the stat string to be initialized with
+     */
+    public StatsTrack(String stat) {
+        this.stats.clear();
+        if (stat == null || stat.length() == 0) {
+            return;
         }
-        String[] split = stats.split(",");
-        if (split.length != 2) {
-            throw new IllegalArgumentException("invalid string " + stats);
+        String[] keyValuePairs = PAIRS_SEPARATOR.split(stat);
+        for (String keyValuePair : keyValuePairs) {
+            String[] kv = keyValuePair.split("=");
+            this.stats.put(kv[0], Long.parseLong(StringUtils.isEmpty(kv[1]) ? "-1" : kv[1]));
         }
-        count = Integer.parseInt(split[0].split("=")[1]);
-        bytes = Long.parseLong(split[1].split("=")[1]);
     }
 
+
     /**
      * get the count of nodes allowed as part of quota
      *
      * @return the count as part of this string
      */
-    public int getCount() {
-        return this.count;
+    public long getCount() {
+        return getValue(countStr);
     }
 
     /**
@@ -68,8 +90,26 @@ public class StatsTrack {
      * @param count
      *            the count to set with
      */
-    public void setCount(int count) {
-        this.count = count;
+    public void setCount(long count) {
+        setValue(countStr, count);
+    }
+
+    /**
+     * get the count of nodes allowed as part of quota (hard limit)
+     *
+     * @return the count as part of this string
+     */
+    public long getCountHardLimit() {
+        return getValue(countHardLimitStr);
+    }
+
+    /**
+     * set the count hard limit
+     *
+     * @param count the count limit to set
+     */
+    public void setCountHardLimit(long count) {
+        setValue(countHardLimitStr, count);
     }
 
     /**
@@ -78,24 +118,101 @@ public class StatsTrack {
      * @return the bytes as part of this string
      */
     public long getBytes() {
-        return this.bytes;
+        return getValue(byteStr);
     }
 
     /**
-     * set teh bytes for this stat tracker.
+     * set the bytes for this stat tracker.
      *
      * @param bytes
      *            the bytes to set with
      */
     public void setBytes(long bytes) {
-        this.bytes = bytes;
+        setValue(byteStr, bytes);
+    }
+
+    /**
+     * get the count of bytes allowed as part of quota (hard limit)
+     *
+     * @return the bytes as part of this string
+     */
+    public long getByteHardLimit() {
+        return getValue(byteHardLimitStr);
+    }
+
+    /**
+     * set the byte hard limit
+     *
+     * @param bytes the byte limit to set
+     */
+    public void setByteHardLimit(long bytes) {
+        setValue(byteHardLimitStr, bytes);
+    }
+
+    /**
+     * get helper to lookup a given key
+     *
+     * @param key the key to lookup
+     * @return key's value or -1 if it doesn't exist
+     */
+    private long getValue(String key) {
+        Long val = this.stats.get(key);
+        return val == null ? -1 : val.longValue();
+    }
+
+    /**
+     * set helper to set the value for the specified key
+     *
+     * @param key   the key to set
+     * @param value the value to set
+     */
+    private void setValue(String key, long value) {
+        this.stats.put(key, value);
     }
 
-    @Override
     /*
      * returns the string that maps to this stat tracking.
-     */ public String toString() {
-        return countStr + "=" + count + "," + byteStr + "=" + bytes;
+     *
+     * Builds a string of the form
+     * "count=4,bytes=5=;countHardLimit=10;byteHardLimit=10"
+     *
+     * This string is slightly hacky to preserve compatibility with 3.4.3 and
+     * older parser. In particular, count must be first, bytes must be second,
+     * all new fields must use a separator that is not a "," (so, ";"), and the
+     * seemingly spurious "=" after the bytes field is essential to allowing
+     * it to be parseable by the old parsing code.
+     */
+    @Override
+    public String toString() {
+        StringBuilder buf = new StringBuilder();
+        ArrayList<String> keys = new ArrayList<>(stats.keySet());
+
+        // Special handling for count=,byte= to enforce them coming first
+        // for backwards compatibility
+        keys.remove(countStr);
+        keys.remove(byteStr);
+        buf.append(countStr);
+        buf.append("=");
+        buf.append(getCount());
+        buf.append(",");
+        buf.append(byteStr);
+        buf.append("=");
+        buf.append(getBytes());
+        if (!keys.isEmpty()) {
+            // Add extra = to trick old parsing code so it will ignore new flags
+            buf.append("=");
+            Collections.sort(keys);
+            for (String key : keys) {
+                buf.append(";");
+                buf.append(key);
+                buf.append("=");
+                buf.append(stats.get(key));
+            }
+        }
+        return buf.toString();
     }
 
+    public byte[] getStatsBytes() {
+        return toString().getBytes(StandardCharsets.UTF_8);
+    }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/CliWrapperException.java b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/CliWrapperException.java
index fcf46d9..8095a40 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/CliWrapperException.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/CliWrapperException.java
@@ -54,6 +54,8 @@ public class CliWrapperException extends CliException {
                 return "No quorum of new config is connected and "
                        + "up-to-date with the leader of last commmitted config - try invoking reconfiguration after "
                        + "new servers are connected and synced";
+            } else if (keeperException instanceof KeeperException.QuotaExceededException) {
+                return "Quota has exceeded : " + keeperException.getPath();
             }
         }
         return cause.getMessage();
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/DelQuotaCommand.java b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/DelQuotaCommand.java
index 70c0be3..d84c830 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/DelQuotaCommand.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/DelQuotaCommand.java
@@ -18,8 +18,6 @@
 
 package org.apache.zookeeper.cli;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-import java.io.IOException;
 import java.util.List;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
@@ -43,11 +41,14 @@ public class DelQuotaCommand extends CliCommand {
     private CommandLine cl;
 
     public DelQuotaCommand() {
-        super("delquota", "[-n|-b] path");
+        super("delquota", "[-n|-b|-N|-B] path");
 
         OptionGroup og1 = new OptionGroup();
-        og1.addOption(new Option("b", false, "bytes quota"));
-        og1.addOption(new Option("n", false, "num quota"));
+        og1.addOption(new Option("n", false, "num soft quota"));
+        og1.addOption(new Option("b", false, "bytes soft quota"));
+        og1.addOption(new Option("N", false, "num hard quota"));
+        og1.addOption(new Option("B", false, "bytes hard quota"));
+
         options.addOptionGroup(og1);
     }
 
@@ -69,22 +70,35 @@ public class DelQuotaCommand extends CliCommand {
 
     @Override
     public boolean exec() throws CliException {
-        //if neither option -n or -b is specified, we delete
-        // the quota node for this node.
         String path = args[1];
+        // Use a StatsTrack object to pass in to delQuota which quotas
+        // to delete by setting them to 1 as a flag.
+        StatsTrack quota = new StatsTrack();
+        if (cl.hasOption("n")) {
+            quota.setCount(1);
+        }
+        if (cl.hasOption("b")) {
+            quota.setBytes(1);
+        }
+        if (cl.hasOption("N")) {
+            quota.setCountHardLimit(1);
+        }
+        if (cl.hasOption("B")) {
+            quota.setByteHardLimit(1);
+        }
+
+        boolean flagSet = (cl.hasOption("n") || cl.hasOption("N")
+                || cl.hasOption("b") || cl.hasOption("B"));
         try {
-            if (cl.hasOption("b")) {
-                delQuota(zk, path, true, false);
-            } else if (cl.hasOption("n")) {
-                delQuota(zk, path, false, true);
-            } else if (args.length == 2) {
-                // we don't have an option specified.
-                // just delete whole quota node
-                delQuota(zk, path, true, true);
-            }
-        } catch (KeeperException | InterruptedException | IOException ex) {
+            delQuota(zk, path, flagSet ? quota : null);
+        } catch (IllegalArgumentException ex) {
+            throw new MalformedPathException(ex.getMessage());
+        } catch (KeeperException.NoNodeException ne) {
+            err.println("quota for " + path + " does not exist.");
+        } catch (KeeperException | InterruptedException ex) {
             throw new CliWrapperException(ex);
         }
+
         return false;
     }
 
@@ -93,20 +107,16 @@ public class DelQuotaCommand extends CliCommand {
      *
      * @param zk the zookeeper client
      * @param path the path to delete quota for
-     * @param bytes true if number of bytes needs to be unset
-     * @param numNodes true if number of nodes needs to be unset
+     * @param quota the quotas to delete (set to 1), null to delete all
      * @return true if quota deletion is successful
      * @throws KeeperException
-     * @throws IOException
+     * @throws MalformedPathException
      * @throws InterruptedException
      */
-    public static boolean delQuota(
-        ZooKeeper zk,
-        String path,
-        boolean bytes,
-        boolean numNodes) throws KeeperException, IOException, InterruptedException, MalformedPathException {
-        String parentPath = Quotas.quotaZookeeper + path;
-        String quotaPath = Quotas.quotaZookeeper + path + "/" + Quotas.limitNode;
+    public static boolean delQuota(ZooKeeper zk, String path, StatsTrack quota)
+            throws KeeperException, InterruptedException, MalformedPathException {
+        String parentPath = Quotas.quotaPath(path);
+        String quotaPath = Quotas.limitPath(path);
         if (zk.exists(quotaPath, false) == null) {
             System.out.println("Quota does not exist for " + path);
             return true;
@@ -117,17 +127,11 @@ public class DelQuotaCommand extends CliCommand {
         } catch (IllegalArgumentException ex) {
             throw new MalformedPathException(ex.getMessage());
         } catch (KeeperException.NoNodeException ne) {
-            System.err.println("quota does not exist for " + path);
-            return true;
+            throw new KeeperException.NoNodeException(ne.getMessage());
         }
-        StatsTrack strack = new StatsTrack(new String(data, UTF_8));
-        if (bytes && !numNodes) {
-            strack.setBytes(-1L);
-            zk.setData(quotaPath, strack.toString().getBytes(UTF_8), -1);
-        } else if (!bytes && numNodes) {
-            strack.setCount(-1);
-            zk.setData(quotaPath, strack.toString().getBytes(UTF_8), -1);
-        } else if (bytes && numNodes) {
+        StatsTrack strack = new StatsTrack(data);
+
+        if (quota == null) {
             // delete till you can find a node with more than
             // one child
             List<String> children = zk.getChildren(parentPath, false);
@@ -137,7 +141,23 @@ public class DelQuotaCommand extends CliCommand {
             }
             // cut the tree till their is more than one child
             trimProcQuotas(zk, parentPath);
+        } else {
+            if (quota.getCount() > 0) {
+                strack.setCount(-1);
+            }
+            if (quota.getBytes() > 0) {
+                strack.setBytes(-1L);
+            }
+            if (quota.getCountHardLimit() > 0) {
+                strack.setCountHardLimit(-1);
+            }
+            if (quota.getByteHardLimit() > 0) {
+                strack.setByteHardLimit(-1L);
+            }
+
+            zk.setData(quotaPath, strack.getStatsBytes(), -1);
         }
+
         return true;
     }
 
@@ -149,12 +169,10 @@ public class DelQuotaCommand extends CliCommand {
      * unwanted parent in the path.
      * @return true if successful
      * @throws KeeperException
-     * @throws IOException
      * @throws InterruptedException
      */
-    private static boolean trimProcQuotas(
-        ZooKeeper zk,
-        String path) throws KeeperException, IOException, InterruptedException {
+    private static boolean trimProcQuotas(ZooKeeper zk, String path)
+            throws KeeperException, InterruptedException {
         if (Quotas.quotaZookeeper.equals(path)) {
             return true;
         }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/ListQuotaCommand.java b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/ListQuotaCommand.java
index 4ae1d22..fe5fe6b 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/ListQuotaCommand.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/ListQuotaCommand.java
@@ -18,7 +18,8 @@
 
 package org.apache.zookeeper.cli;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.Options;
@@ -26,6 +27,7 @@ import org.apache.commons.cli.ParseException;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Quotas;
 import org.apache.zookeeper.StatsTrack;
+import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 
 /**
@@ -60,16 +62,18 @@ public class ListQuotaCommand extends CliCommand {
     @Override
     public boolean exec() throws CliException {
         String path = args[1];
-        String absolutePath = Quotas.quotaZookeeper + path + "/" + Quotas.limitNode;
+        String absolutePath = Quotas.limitPath(path);
         try {
             err.println("absolute path is " + absolutePath);
-            Stat stat = new Stat();
-            byte[] data = zk.getData(absolutePath, false, stat);
-            StatsTrack st = new StatsTrack(new String(data, UTF_8));
-            out.println("Output quota for " + path + " " + st);
-
-            data = zk.getData(Quotas.quotaZookeeper + path + "/" + Quotas.statNode, false, stat);
-            out.println("Output stat for " + path + " " + new StatsTrack(new String(data, UTF_8)));
+            List<StatsTrack> statsTracks = listQuota(zk, path);
+            for (int i = 0; i < statsTracks.size(); i++) {
+                StatsTrack st = statsTracks.get(i);
+                if (i == 0) {
+                    out.println("Output quota for " + path + " " + st.toString());
+                } else {
+                    out.println("Output stat for " + path + " " + st.toString());
+                }
+            }
         } catch (IllegalArgumentException ex) {
             throw new MalformedPathException(ex.getMessage());
         } catch (KeeperException.NoNodeException ne) {
@@ -81,4 +85,19 @@ public class ListQuotaCommand extends CliCommand {
         return false;
     }
 
+    // @VisibleForTesting
+    public static List<StatsTrack> listQuota(ZooKeeper zk, String path) throws KeeperException, InterruptedException {
+        List<StatsTrack> statsTracks = new ArrayList<>();
+        Stat stat = new Stat();
+        byte[] data = zk.getData(Quotas.limitPath(path), false, stat);
+        StatsTrack st = new StatsTrack(data);
+        statsTracks.add(st);
+
+        data = zk.getData(Quotas.statPath(path), false, stat);
+        st = new StatsTrack(data);
+        statsTracks.add(st);
+
+        return statsTracks;
+    }
+
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/SetQuotaCommand.java b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/SetQuotaCommand.java
index f9c17f5..f564398 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/SetQuotaCommand.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/SetQuotaCommand.java
@@ -18,7 +18,6 @@
 
 package org.apache.zookeeper.cli;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.commons.cli.CommandLine;
@@ -49,11 +48,14 @@ public class SetQuotaCommand extends CliCommand {
     private CommandLine cl;
 
     public SetQuotaCommand() {
-        super("setquota", "-n|-b val path");
+        super("setquota", "-n|-b|-N|-B val path");
 
         OptionGroup og1 = new OptionGroup();
-        og1.addOption(new Option("b", true, "bytes quota"));
-        og1.addOption(new Option("n", true, "num quota"));
+        og1.addOption(new Option("n", true, "num soft quota"));
+        og1.addOption(new Option("b", true, "bytes soft quota"));
+        og1.addOption(new Option("N", true, "num hard quota"));
+        og1.addOption(new Option("B", true, "bytes hard quota"));
+
         og1.setRequired(true);
         options.addOptionGroup(og1);
     }
@@ -83,36 +85,95 @@ public class SetQuotaCommand extends CliCommand {
             return false;
         }
 
-        if (cl.hasOption("b")) {
-            // we are setting the bytes quota
-            long bytes = Long.parseLong(cl.getOptionValue("b"));
-            try {
-                createQuota(zk, path, bytes, -1);
-            } catch (KeeperException | InterruptedException | IllegalArgumentException ex) {
-                throw new CliWrapperException(ex);
-            }
-        } else if (cl.hasOption("n")) {
-            // we are setting the num quota
-            int numNodes = Integer.parseInt(cl.getOptionValue("n"));
+        StatsTrack quota = new StatsTrack();
+        quota.setCount(-1);
+        quota.setBytes(-1L);
+        quota.setCountHardLimit(-1);
+        quota.setByteHardLimit(-1L);
+
+        if (!checkOptionValue(quota)) {
+            return false;
+        }
+
+        boolean flagSet = (cl.hasOption("n") || cl.hasOption("N")
+                || cl.hasOption("b") || cl.hasOption("B"));
+        if (flagSet) {
             try {
-                createQuota(zk, path, -1L, numNodes);
-            } catch (KeeperException | InterruptedException | IllegalArgumentException ex) {
+                createQuota(zk, path, quota);
+            } catch (IllegalArgumentException ex) {
+                throw new MalformedPathException(ex.getMessage());
+            } catch (KeeperException | InterruptedException ex) {
                 throw new CliWrapperException(ex);
             }
         } else {
-            throw new MalformedCommandException(getUsageStr());
+            err.println(getUsageStr());
         }
 
         return false;
     }
 
-    public static boolean createQuota(
-        ZooKeeper zk,
-        String path,
-        long bytes,
-        int numNodes) throws KeeperException, InterruptedException, IllegalArgumentException, MalformedPathException {
+    private boolean checkOptionValue(StatsTrack quota) {
+
+        try {
+            if (cl.hasOption("n")) {
+                // we are setting the num quota
+                int count = Integer.parseInt(cl.getOptionValue("n"));
+                if (count > 0) {
+                    quota.setCount(count);
+                } else {
+                    err.println("the num quota must be greater than zero");
+                    return false;
+                }
+            }
+            if (cl.hasOption("b")) {
+                // we are setting the bytes quota
+                long bytes = Long.parseLong(cl.getOptionValue("b"));
+                if (bytes >= 0) {
+                    quota.setBytes(bytes);
+                } else {
+                    err.println("the bytes quota must be greater than or equal to zero");
+                    return false;
+                }
+            }
+            if (cl.hasOption("N")) {
+                // we are setting the num hard quota
+                int count = Integer.parseInt(cl.getOptionValue("N"));
+                if (count > 0) {
+                    quota.setCountHardLimit(count);
+                } else {
+                    err.println("the num quota must be greater than zero");
+                    return false;
+                }
+            }
+            if (cl.hasOption("B")) {
+                // we are setting the byte hard quota
+                long bytes = Long.parseLong(cl.getOptionValue("B"));
+                if (bytes >= 0) {
+                    quota.setByteHardLimit(bytes);
+                } else {
+                    err.println("the bytes quota must be greater than or equal to zero");
+                    return false;
+                }
+            }
+        } catch (NumberFormatException e) {
+            err.println("NumberFormatException happens when parsing the option value");
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * this method creates a quota node for the path
+     * @param zk the ZooKeeper client
+     * @param path the path for which quota needs to be created
+     * @param quota the quotas
+     * @return true if its successful and false if not.
+     */
+    public static boolean createQuota(ZooKeeper zk, String path, StatsTrack quota)
+            throws KeeperException, InterruptedException, MalformedPathException {
         // check if the path exists. We cannot create
-        // quota for a path that already exists in zookeeper
+        // quota for a path that doesn't exist in zookeeper
         // for now.
         Stat initStat;
         try {
@@ -158,39 +219,77 @@ public class SetQuotaCommand extends CliCommand {
         for (int i = 1; i < splits.length; i++) {
             sb.append("/").append(splits[i]);
             quotaPath = sb.toString();
-            try {
-                zk.create(quotaPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            } catch (KeeperException.NodeExistsException ne) {
-                //do nothing
+            if (zk.exists(quotaPath, false) == null) {
+                try {
+                    zk.create(quotaPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                } catch (KeeperException.NodeExistsException ne) {
+                    //do nothing
+                }
             }
         }
         String statPath = quotaPath + "/" + Quotas.statNode;
         quotaPath = quotaPath + "/" + Quotas.limitNode;
-        StatsTrack strack = new StatsTrack(null);
-        strack.setBytes(bytes);
-        strack.setCount(numNodes);
-        try {
-            zk.create(quotaPath, strack.toString().getBytes(UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            StatsTrack stats = new StatsTrack(null);
-            stats.setBytes(0L);
+        byte[] data;
+
+        if (zk.exists(quotaPath, false) == null) {
+            zk.create(quotaPath, quota.getStatsBytes(),
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+            StatsTrack stats = new StatsTrack();
             stats.setCount(0);
-            zk.create(statPath, stats.toString().getBytes(UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        } catch (KeeperException.NodeExistsException ne) {
-            byte[] data = zk.getData(quotaPath, false, new Stat());
-            StatsTrack strackC = new StatsTrack(new String(data, UTF_8));
-            if (bytes != -1L) {
-                strackC.setBytes(bytes);
+            stats.setBytes(0L);
+
+            zk.create(statPath, stats.getStatsBytes(),
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+            data = zk.getData(quotaPath, false, new Stat());
+            StatsTrack quotaStrack = new StatsTrack(data);
+
+            data = zk.getData(statPath, false, new Stat());
+            StatsTrack statStrack = new StatsTrack(data);
+            checkQuota(quotaStrack, statStrack);
+
+        } else {
+            data = zk.getData(quotaPath, false, new Stat());
+            StatsTrack quotaStrack = new StatsTrack(data);
+
+            if (quota.getCount() > -1) {
+                quotaStrack.setCount(quota.getCount());
+            }
+            if (quota.getBytes() > -1L) {
+                quotaStrack.setBytes(quota.getBytes());
+            }
+            if (quota.getCountHardLimit() > -1) {
+                quotaStrack.setCountHardLimit(quota.getCountHardLimit());
             }
-            if (numNodes != -1) {
-                strackC.setCount(numNodes);
+            if (quota.getByteHardLimit() > -1L) {
+                quotaStrack.setByteHardLimit(quota.getByteHardLimit());
             }
-            zk.setData(quotaPath, strackC.toString().getBytes(UTF_8), -1);
+
+            data = zk.getData(statPath, false, new Stat());
+            StatsTrack statStrack = new StatsTrack(data);
+            checkQuota(quotaStrack, statStrack);
+
+            zk.setData(quotaPath, quotaStrack.getStatsBytes(), -1);
         }
+
         return true;
     }
 
+    private static void checkQuota(StatsTrack quotaStrack, StatsTrack statStrack) {
+        if ((quotaStrack.getCount() > -1 && quotaStrack.getCount() < statStrack.getCount()) || (quotaStrack.getCountHardLimit() > -1
+                && quotaStrack.getCountHardLimit() < statStrack.getCount())) {
+            System.out.println("[Warning]: the count quota you create is less than the existing count:" + statStrack.getCount());
+        }
+        if ((quotaStrack.getBytes() > -1 && quotaStrack.getBytes() < statStrack.getBytes()) || (quotaStrack.getByteHardLimit() > -1
+                && quotaStrack.getByteHardLimit() < statStrack.getBytes())) {
+            System.out.println("[Warning]: the bytes quota you create is less than the existing bytes:" + statStrack.getBytes());
+        }
+    }
+
     private static void checkIfChildQuota(ZooKeeper zk, String path) throws KeeperException, InterruptedException {
-        String realPath = Quotas.quotaZookeeper + path;
+        String realPath = Quotas.quotaPath(path);
 
         try {
             ZKUtil.visitSubTreeDFS(zk, realPath, false, (rc, quotaPath, ctx, name) -> {
@@ -209,7 +308,7 @@ public class SetQuotaCommand extends CliCommand {
                 }
                 for (String child : children) {
                     if (!quotaPath.equals(Quotas.quotaZookeeper + path) && Quotas.limitNode.equals(child)) {
-                        throw new IllegalArgumentException(path + " has a child " + quotaPath.substring(Quotas.quotaZookeeper.length()) + " which has a quota");
+                        throw new IllegalArgumentException(path + " has a child " + Quotas.trimQuotaPath(quotaPath) + " which has a quota");
                     }
                 }
             });
@@ -221,13 +320,13 @@ public class SetQuotaCommand extends CliCommand {
     private static void checkIfParentQuota(ZooKeeper zk, String path) throws InterruptedException, KeeperException {
         final String[] splits = path.split("/");
         String quotaPath = Quotas.quotaZookeeper;
-        for (String str : splits) {
-            if (str.length() == 0) {
-                // this should only be for the beginning of the path
-                // i.e. "/..." - split(path)[0] is empty string before first '/'
-                continue;
-            }
-            quotaPath += "/" + str;
+
+        StringBuilder sb = new StringBuilder();
+        sb.append(quotaPath);
+        for (int i = 1; i < splits.length - 1; i++) {
+            sb.append("/");
+            sb.append(splits[i]);
+            quotaPath = sb.toString();
             List<String> children = null;
             try {
                 children = zk.getChildren(quotaPath, false);
@@ -239,11 +338,10 @@ public class SetQuotaCommand extends CliCommand {
                 return;
             }
             for (String child : children) {
-                if (!quotaPath.equals(Quotas.quotaZookeeper + path) && Quotas.limitNode.equals(child)) {
-                    throw new IllegalArgumentException(path + " has a parent " + quotaPath.substring(Quotas.quotaZookeeper.length()) + " which has a quota");
+                if (!quotaPath.equals(Quotas.quotaPath(path)) && Quotas.limitNode.equals(child)) {
+                    throw new IllegalArgumentException(path + " has a parent " + Quotas.trimQuotaPath(quotaPath) + " which has a quota");
                 }
             }
         }
     }
-
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/common/StringUtils.java b/zookeeper-server/src/main/java/org/apache/zookeeper/common/StringUtils.java
index 8e89abe..7723743 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/common/StringUtils.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/StringUtils.java
@@ -74,4 +74,22 @@ public class StringUtils {
         return s == null || s.trim().isEmpty();
     }
 
+    /**
+     * <p>Checks if a String is empty ("") or null.</p>
+     *
+     * <pre>
+     * StringUtils.isEmpty(null)      = true
+     * StringUtils.isEmpty("")        = true
+     * StringUtils.isEmpty(" ")       = false
+     * StringUtils.isEmpty("bob")     = false
+     * StringUtils.isEmpty("  bob  ") = false
+     * </pre>
+     *
+     * @param str  the String to check, may be null
+     * @return <code>true</code> if the String is empty or null
+     */
+    public static boolean isEmpty(String str) {
+        return str == null || str.length() == 0;
+    }
+
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
index afb549f..7e2e843 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
@@ -18,7 +18,6 @@
 
 package org.apache.zookeeper.server;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -372,56 +371,32 @@ public class DataTree {
     }
 
     /**
-     * update the count/count of bytes of this stat datanode
+     * update the count/bytes of this stat data node
      *
      * @param lastPrefix
-     *            the path of the node that is quotaed.
+     *            the path of the node that has a quota.
      * @param bytesDiff
      *            the diff to be added to number of bytes
      * @param countDiff
      *            the diff to be added to the count
      */
-    public void updateCountBytes(String lastPrefix, long bytesDiff, int countDiff) {
-        String statNode = Quotas.statPath(lastPrefix);
-        DataNode node = nodes.get(statNode);
+    public void updateQuotaStat(String lastPrefix, long bytesDiff, int countDiff) {
 
-        StatsTrack updatedStat = null;
-        if (node == null) {
+        String statNodePath = Quotas.statPath(lastPrefix);
+        DataNode statNode = nodes.get(statNodePath);
+
+        StatsTrack updatedStat;
+        if (statNode == null) {
             // should not happen
-            LOG.error("Missing count node for stat {}", statNode);
+            LOG.error("Missing node for stat {}", statNodePath);
             return;
         }
-        synchronized (node) {
-            updatedStat = new StatsTrack(new String(node.data, UTF_8));
+        synchronized (statNode) {
+            updatedStat = new StatsTrack(statNode.data);
             updatedStat.setCount(updatedStat.getCount() + countDiff);
             updatedStat.setBytes(updatedStat.getBytes() + bytesDiff);
-            node.data = updatedStat.toString().getBytes(UTF_8);
-        }
-        // now check if the counts match the quota
-        String quotaNode = Quotas.quotaPath(lastPrefix);
-        node = nodes.get(quotaNode);
-        StatsTrack thisStats = null;
-        if (node == null) {
-            // should not happen
-            LOG.error("Missing count node for quota {}", quotaNode);
-            return;
-        }
-        synchronized (node) {
-            thisStats = new StatsTrack(new String(node.data, UTF_8));
-        }
-        if (thisStats.getCount() > -1 && (thisStats.getCount() < updatedStat.getCount())) {
-            LOG.warn(
-                "Quota exceeded: {} count={} limit={}",
-                lastPrefix,
-                updatedStat.getCount(),
-                thisStats.getCount());
-        }
-        if (thisStats.getBytes() > -1 && (thisStats.getBytes() < updatedStat.getBytes())) {
-            LOG.warn(
-                "Quota exceeded: {} bytes={} limit={}",
-                lastPrefix,
-                updatedStat.getBytes(),
-                thisStats.getBytes());
+
+            statNode.data = updatedStat.getStatsBytes();
         }
     }
 
@@ -537,18 +512,18 @@ public class DataTree {
             if (Quotas.limitNode.equals(childName)) {
                 // this is the limit node
                 // get the parent and add it to the trie
-                pTrie.addPath(parentName.substring(quotaZookeeper.length()));
+                pTrie.addPath(Quotas.trimQuotaPath(parentName));
             }
             if (Quotas.statNode.equals(childName)) {
-                updateQuotaForPath(parentName.substring(quotaZookeeper.length()));
+                updateQuotaForPath(Quotas.trimQuotaPath(parentName));
             }
         }
-        // also check to update the quotas for this node
+
         String lastPrefix = getMaxPrefixWithQuota(path);
         long bytes = data == null ? 0 : data.length;
-        if (lastPrefix != null) {
-            // ok we have some match and need to update
-            updateCountBytes(lastPrefix, bytes, 1);
+        // also check to update the quotas for this node
+        if (lastPrefix != null) {    // ok we have some match and need to update
+            updateQuotaStat(lastPrefix, bytes, 1);
         }
         updateWriteStat(path, bytes);
         dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
@@ -621,18 +596,18 @@ public class DataTree {
         if (parentName.startsWith(procZookeeper) && Quotas.limitNode.equals(childName)) {
             // delete the node in the trie.
             // we need to update the trie as well
-            pTrie.deletePath(parentName.substring(quotaZookeeper.length()));
+            pTrie.deletePath(Quotas.trimQuotaPath(parentName));
         }
 
         // also check to update the quotas for this node
         String lastPrefix = getMaxPrefixWithQuota(path);
         if (lastPrefix != null) {
             // ok we have some match and need to update
-            int bytes = 0;
+            long bytes = 0;
             synchronized (node) {
                 bytes = (node.data == null ? 0 : -(node.data.length));
             }
-            updateCountBytes(lastPrefix, bytes, -1);
+            updateQuotaStat(lastPrefix, bytes, -1);
         }
 
         updateWriteStat(path, 0L);
@@ -670,11 +645,14 @@ public class DataTree {
             n.copyStat(s);
             nodes.postChange(path, n);
         }
-        // now update if the path is in a quota subtree.
+
+        // first do a quota check if the path is in a quota subtree.
         String lastPrefix = getMaxPrefixWithQuota(path);
+        long bytesDiff = (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length);
+        // now update if the path is in a quota subtree.
         long dataBytes = data == null ? 0 : data.length;
         if (lastPrefix != null) {
-            this.updateCountBytes(lastPrefix, dataBytes - (lastdata == null ? 0 : lastdata.length), 0);
+            updateQuotaStat(lastPrefix, bytesDiff, 0);
         }
         nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastdata));
 
@@ -1252,7 +1230,7 @@ public class DataTree {
         StatsTrack strack = new StatsTrack();
         strack.setBytes(c.bytes);
         strack.setCount(c.count);
-        String statPath = Quotas.quotaZookeeper + path + "/" + Quotas.statNode;
+        String statPath = Quotas.statPath(path);
         DataNode node = getNode(statPath);
         // it should exist
         if (node == null) {
@@ -1261,7 +1239,7 @@ public class DataTree {
         }
         synchronized (node) {
             nodes.preChange(statPath, node);
-            node.data = strack.toString().getBytes(UTF_8);
+            node.data = strack.getStatsBytes();
             nodes.postChange(statPath, node);
         }
     }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
index 6eb7b96..3e6d479 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
@@ -394,6 +394,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             validatePath(path, request.sessionId);
             nodeRecord = getRecordForPath(path);
             zks.checkACL(request.cnxn, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo, path, null);
+            zks.checkQuota(path, setDataRequest.getData(), OpCode.setData);
             int newVersion = checkAndIncVersion(nodeRecord.stat.getVersion(), setDataRequest.getVersion(), path);
             request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
             nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
@@ -697,6 +698,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             throw new KeeperException.NoChildrenForEphemeralsException(path);
         }
         int newCversion = parentRecord.stat.getCversion() + 1;
+        zks.checkQuota(path, data, OpCode.create);
         if (type == OpCode.createContainer) {
             request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
         } else if (type == OpCode.createTTL) {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 7ef687e..5c3f5f4 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -46,10 +46,13 @@ import org.apache.zookeeper.Environment;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.Quotas;
+import org.apache.zookeeper.StatsTrack;
 import org.apache.zookeeper.Version;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.ZookeeperBanner;
+import org.apache.zookeeper.common.StringUtils;
 import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
@@ -95,11 +98,13 @@ import org.slf4j.LoggerFactory;
 public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
 
     protected static final Logger LOG;
+    private static final RateLogger RATE_LOGGER;
 
     public static final String GLOBAL_OUTSTANDING_LIMIT = "zookeeper.globalOutstandingLimit";
 
     public static final String ENABLE_EAGER_ACL_CHECK = "zookeeper.enableEagerACLCheck";
     public static final String SKIP_ACL = "zookeeper.skipACL";
+    public static final String ENFORCE_QUOTA = "zookeeper.enforceQuota";
 
     // When enabled, will check ACL constraints appertained to the requests first,
     // before sending the requests to the quorum.
@@ -107,6 +112,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
 
     static final boolean skipACL;
 
+    public static final boolean enforceQuota;
+
     public static final String SASL_SUPER_USER = "zookeeper.superUser";
 
     public static final String ALLOW_SASL_FAILED_CLIENTS = "zookeeper.allowSaslFailedClients";
@@ -121,6 +128,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     static {
         LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
 
+        RATE_LOGGER = new RateLogger(LOG);
+
         ZookeeperBanner.printBanner(LOG);
 
         Environment.logEnv("Server environment:", LOG);
@@ -133,6 +142,11 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
             LOG.info("{}==\"yes\", ACL checks will be skipped", SKIP_ACL);
         }
 
+        enforceQuota = Boolean.parseBoolean(System.getProperty(ENFORCE_QUOTA, "false"));
+        if (enforceQuota) {
+            LOG.info("{} = {}, Quota Enforce enables", ENFORCE_QUOTA, enforceQuota);
+        }
+
         digestEnabled = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DIGEST_ENABLED, "true"));
         LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled);
 
@@ -2002,6 +2016,123 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         throw new KeeperException.NoAuthException();
     }
 
+    /**
+     * check a path whether exceeded the quota.
+     *
+     * @param path
+     *            the path of the node
+     * @param data
+     *            the data of the path
+     * @param type
+     *            currently, create and setData need to check quota
+     */
+
+    public void checkQuota(String path, byte[] data, int type) throws KeeperException.QuotaExceededException {
+        if (!enforceQuota) {
+            return;
+        }
+        long dataBytes = (data == null) ? 0 : data.length;
+        ZKDatabase zkDatabase = getZKDatabase();
+        String lastPrefix = zkDatabase.getDataTree().getMaxPrefixWithQuota(path);
+        if (StringUtils.isEmpty(lastPrefix)) {
+            return;
+        }
+
+        switch (type) {
+            case OpCode.create:
+                checkQuota(lastPrefix, dataBytes, 1);
+                break;
+            case OpCode.setData:
+                DataNode node = zkDatabase.getDataTree().getNode(path);
+                byte[] lastData;
+                synchronized (node) {
+                    lastData = node.getData();
+                }
+                checkQuota(lastPrefix, dataBytes - (lastData == null ? 0 : lastData.length), 0);
+                break;
+             default:
+                 throw new IllegalArgumentException("Unsupported OpCode for checkQuota: " + type);
+        }
+    }
+
+    /**
+     * check a path whether exceeded the quota.
+     *
+     * @param lastPrefix
+                  the path of the node which has a quota.
+     * @param bytesDiff
+     *            the diff to be added to number of bytes
+     * @param countDiff
+     *            the diff to be added to the count
+     */
+    private void checkQuota(String lastPrefix, long bytesDiff, long countDiff)
+            throws KeeperException.QuotaExceededException {
+        LOG.debug("checkQuota: lastPrefix={}, bytesDiff={}, countDiff={}", lastPrefix, bytesDiff, countDiff);
+
+        // now check the quota we set
+        String limitNode = Quotas.limitPath(lastPrefix);
+        DataNode node = getZKDatabase().getNode(limitNode);
+        StatsTrack limitStats;
+        if (node == null) {
+            // should not happen
+            LOG.error("Missing limit node for quota {}", limitNode);
+            return;
+        }
+        synchronized (node) {
+            limitStats = new StatsTrack(node.data);
+        }
+        //check the quota
+        boolean checkCountQuota = countDiff != 0 && (limitStats.getCount() > -1 || limitStats.getCountHardLimit() > -1);
+        boolean checkByteQuota = bytesDiff != 0 && (limitStats.getBytes() > -1 || limitStats.getByteHardLimit() > -1);
+
+        if (!checkCountQuota && !checkByteQuota) {
+            return;
+        }
+
+        //check the statPath quota
+        String statNode = Quotas.statPath(lastPrefix);
+        node = getZKDatabase().getNode(statNode);
+
+        StatsTrack currentStats;
+        if (node == null) {
+            // should not happen
+            LOG.error("Missing node for stat {}", statNode);
+            return;
+        }
+        synchronized (node) {
+            currentStats = new StatsTrack(node.data);
+        }
+
+        //check the Count Quota
+        if (checkCountQuota) {
+            long newCount = currentStats.getCount() + countDiff;
+            boolean isCountHardLimit = limitStats.getCountHardLimit() > -1 ? true : false;
+            long countLimit = isCountHardLimit ? limitStats.getCountHardLimit() : limitStats.getCount();
+
+            if (newCount > countLimit) {
+                String msg = "Quota exceeded: " + lastPrefix + " [current count=" + newCount + ", " + (isCountHardLimit ? "hard" : "soft") + "CountLimit=" + countLimit + "]";
+                RATE_LOGGER.rateLimitLog(msg);
+                if (isCountHardLimit) {
+                    throw new KeeperException.QuotaExceededException(lastPrefix);
+                }
+            }
+        }
+
+        //check the Byte Quota
+        if (checkByteQuota) {
+            long newBytes = currentStats.getBytes() + bytesDiff;
+            boolean isByteHardLimit = limitStats.getByteHardLimit() > -1 ? true : false;
+            long byteLimit = isByteHardLimit ? limitStats.getByteHardLimit() : limitStats.getBytes();
+            if (newBytes > byteLimit) {
+                String msg = "Quota exceeded: " + lastPrefix + " [current bytes=" + newBytes + ", " + (isByteHardLimit ? "hard" : "soft") + "ByteLimit=" + byteLimit + "]";
+                RATE_LOGGER.rateLimitLog(msg);
+                if (isByteHardLimit) {
+                    throw new KeeperException.QuotaExceededException(lastPrefix);
+                }
+            }
+        }
+    }
+
     public static boolean isDigestEnabled() {
         return digestEnabled;
     }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java
index fe83b76..151e873 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java
@@ -233,8 +233,8 @@ public class DataTreeTest extends ZKTestCase {
         DataTree dserTree = new DataTree();
 
         dserTree.createNode("/bug", new byte[20], null, -1, 1, 1, 1);
-        dserTree.createNode(Quotas.quotaZookeeper + "/bug", null, null, -1, 1, 1, 1);
-        dserTree.createNode(Quotas.quotaPath("/bug"), new byte[20], null, -1, 1, 1, 1);
+        dserTree.createNode(Quotas.quotaPath("/bug"), null, null, -1, 1, 1, 1);
+        dserTree.createNode(Quotas.limitPath("/bug"), new byte[20], null, -1, 1, 1, 1);
         dserTree.createNode(Quotas.statPath("/bug"), new byte[20], null, -1, 1, 1, 1);
 
         //deserialize a DataTree; this should clear the old /bug nodes and pathTrie
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/EnforceQuotaTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/EnforceQuotaTest.java
new file mode 100644
index 0000000..470ce39
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/EnforceQuotaTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.junit.Assert.fail;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.StatsTrack;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.cli.SetQuotaCommand;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * An unit case when Enforce Quota disables by default
+ */
+public class EnforceQuotaTest extends ClientBase {
+
+    private ZooKeeper zk;
+
+    @BeforeEach
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        zk = createClient();
+    }
+
+    @AfterEach
+    @Override
+    public void tearDown() throws Exception {
+        System.clearProperty(ZooKeeperServer.ENFORCE_QUOTA);
+        super.tearDown();
+        zk.close();
+    }
+
+    @Test
+    public void testSetQuotaDisableWhenExceedBytesHardQuota() throws Exception {
+        final String path = "/c1";
+        zk.create(path, "12345".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        StatsTrack st = new StatsTrack();
+        st.setByteHardLimit(5L);
+        SetQuotaCommand.createQuota(zk, path, st);
+
+        try {
+            zk.setData(path, "123456".getBytes(), -1);
+        } catch (KeeperException.QuotaExceededException e) {
+            fail("should not throw Byte Quota Exceeded Exception when enforce quota disables");
+        }
+    }
+
+    @Test
+    public void testSetQuotaDisableWhenExceedCountHardQuota() throws Exception {
+
+        final String path = "/c1";
+        zk.create(path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        int count = 2;
+        StatsTrack st = new StatsTrack();
+        st.setCountHardLimit(count);
+        SetQuotaCommand.createQuota(zk, path, st);
+        zk.create(path + "/c2", "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        try {
+            zk.create(path + "/c2" + "/c3", "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        } catch (KeeperException.QuotaExceededException e) {
+            fail("should not throw Count Quota Exceeded Exception when enforce quota disables");
+        }
+    }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumQuotaTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumQuotaTest.java
index 2eed9d3..ac6319b 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumQuotaTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumQuotaTest.java
@@ -39,17 +39,21 @@ public class QuorumQuotaTest extends QuorumBase {
         for (i = 0; i < 300; i++) {
             zk.create("/a/" + i, "some".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         }
-        SetQuotaCommand.createQuota(zk, "/a", 1000L, 5000);
-        String statPath = Quotas.quotaZookeeper + "/a" + "/" + Quotas.statNode;
+
+        StatsTrack quota = new StatsTrack();
+        quota.setCount(1000);
+        quota.setBytes(5000);
+        SetQuotaCommand.createQuota(zk, "/a", quota);
+        String statPath = Quotas.statPath("/a");
         byte[] data = zk.getData(statPath, false, new Stat());
-        StatsTrack st = new StatsTrack(new String(data));
+        StatsTrack st = new StatsTrack(data);
         assertTrue(st.getBytes() == 1204L, "bytes are set");
         assertTrue(st.getCount() == 301, "num count is set");
         for (i = 300; i < 600; i++) {
             zk.create("/a/" + i, "some".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         }
         data = zk.getData(statPath, false, new Stat());
-        st = new StatsTrack(new String(data));
+        st = new StatsTrack(data);
         assertTrue(st.getBytes() == 2404L, "bytes are set");
         assertTrue(st.getCount() == 601, "num count is set");
     }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuotasTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuotasTest.java
new file mode 100644
index 0000000..b887e28
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuotasTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.zookeeper.Quotas;
+import org.junit.Test;
+
+public class QuotasTest {
+
+    @Test
+    public void testStatPath() {
+        assertEquals("/zookeeper/quota/foo/zookeeper_stats", Quotas.statPath("/foo"));
+        assertEquals("/zookeeper/quota/bar/zookeeper_stats", Quotas.statPath("/bar"));
+    }
+
+    @Test
+    public void testLimitPath() {
+        assertEquals("/zookeeper/quota/foo/zookeeper_limits", Quotas.limitPath("/foo"));
+        assertEquals("/zookeeper/quota/bar/zookeeper_limits", Quotas.limitPath("/bar"));
+    }
+
+    @Test
+    public void testQuotaPathPath() {
+        assertEquals("/zookeeper/quota/bar", Quotas.quotaPath("/bar"));
+        assertEquals("/zookeeper/quota/foo", Quotas.quotaPath("/foo"));
+    }
+
+    @Test
+    public void testTrimQuotaPath() {
+        assertEquals("/foo", Quotas.trimQuotaPath("/zookeeper/quota/foo"));
+        assertEquals("/bar", Quotas.trimQuotaPath("/zookeeper/quota/bar"));
+    }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/StatsTrackTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/StatsTrackTest.java
new file mode 100644
index 0000000..978e629
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/StatsTrackTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.StatsTrack;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StatsTrackTest {
+
+    public static class OldStatsTrack {
+        private int count;
+        private long bytes;
+        private String countStr = "count";
+        private String byteStr = "bytes";
+
+        /**
+         * a default constructor for
+         * stats
+         */
+        public OldStatsTrack() {
+            this(null);
+        }
+        /**
+         * the stat string should be of the form count=int,bytes=long
+         * if stats is called with null the count and bytes are initialized
+         * to -1.
+         * @param stats the stat string to be intialized with
+         */
+        public OldStatsTrack(String stats) {
+            if (stats == null) {
+                stats = "count=-1,bytes=-1";
+            }
+            String[] split = stats.split(",");
+            if (split.length != 2) {
+                throw new IllegalArgumentException("invalid string " + stats);
+            }
+            count = Integer.parseInt(split[0].split("=")[1]);
+            bytes = Long.parseLong(split[1].split("=")[1]);
+        }
+
+
+        /**
+         * get the count of nodes allowed as part of quota
+         *
+         * @return the count as part of this string
+         */
+        public int getCount() {
+            return this.count;
+        }
+
+        /**
+         * set the count for this stat tracker.
+         *
+         * @param count
+         *            the count to set with
+         */
+        public void setCount(int count) {
+            this.count = count;
+        }
+
+        /**
+         * get the count of bytes allowed as part of quota
+         *
+         * @return the bytes as part of this string
+         */
+        public long getBytes() {
+            return this.bytes;
+        }
+
+        /**
+         * set teh bytes for this stat tracker.
+         *
+         * @param bytes
+         *            the bytes to set with
+         */
+        public void setBytes(long bytes) {
+            this.bytes = bytes;
+        }
+
+        @Override
+        /*
+         * returns the string that maps to this stat tracking.
+         */
+        public String toString() {
+            return countStr + "=" + count + "," + byteStr + "=" + bytes;
+        }
+    }
+
+    @Test
+    public void testBackwardCompatibility() {
+        StatsTrack quota = new StatsTrack();
+        quota.setCount(4);
+        quota.setCountHardLimit(4);
+        quota.setBytes(9L);
+        quota.setByteHardLimit(15L);
+        Assert.assertEquals("count=4,bytes=9=;byteHardLimit=15;countHardLimit=4", quota.toString());
+
+        OldStatsTrack ost = new OldStatsTrack(quota.toString());
+        Assert.assertTrue("bytes are set", ost.getBytes() == 9L);
+        Assert.assertTrue("num count is set", ost.getCount() == 4);
+        Assert.assertEquals("count=4,bytes=9", ost.toString());
+    }
+
+    @Test
+    public void testUpwardCompatibility() {
+        OldStatsTrack ost = new OldStatsTrack(null);
+        ost.setCount(2);
+        ost.setBytes(5);
+        Assert.assertEquals("count=2,bytes=5", ost.toString());
+
+        StatsTrack st = new StatsTrack(ost.toString());
+        Assert.assertEquals("count=2,bytes=5", st.toString());
+        Assert.assertEquals(5, st.getBytes());
+        Assert.assertEquals(2, st.getCount());
+        Assert.assertEquals(-1, st.getByteHardLimit());
+        Assert.assertEquals(-1, st.getCountHardLimit());
+    }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ZooKeeperQuotaTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ZooKeeperQuotaTest.java
index 99a595f..67b524a 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ZooKeeperQuotaTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ZooKeeperQuotaTest.java
@@ -21,24 +21,49 @@ package org.apache.zookeeper.test;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 import java.io.IOException;
+import java.util.List;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.QuotaExceededException;
 import org.apache.zookeeper.Quotas;
 import org.apache.zookeeper.StatsTrack;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.cli.DelQuotaCommand;
+import org.apache.zookeeper.cli.ListQuotaCommand;
 import org.apache.zookeeper.cli.MalformedPathException;
 import org.apache.zookeeper.cli.SetQuotaCommand;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.test.StatsTrackTest.OldStatsTrack;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 public class ZooKeeperQuotaTest extends ClientBase {
+    private ZooKeeper zk = null;
+
+    @BeforeEach
+    @Override
+    public void setUp() throws Exception {
+        System.setProperty(ZooKeeperServer.ENFORCE_QUOTA, "true");
+        super.setUp();
+        zk = createClient();
+    }
+
+    @AfterEach
+    @Override
+    public void tearDown() throws Exception {
+        System.clearProperty(ZooKeeperServer.ENFORCE_QUOTA);
+        super.tearDown();
+        zk.close();
+    }
 
     @Test
     public void testQuota() throws Exception {
-        final ZooKeeper zk = createClient();
+
         final String path = "/a/b/v";
         // making sure setdata works on /
         zk.setData("/", "some".getBytes(), -1);
@@ -49,18 +74,31 @@ public class ZooKeeperQuotaTest extends ClientBase {
         zk.create("/a/b/v", "some".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
         zk.create("/a/b/v/d", "some".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        SetQuotaCommand.createQuota(zk, path, 5L, 10);
+
+        StatsTrack quota = new StatsTrack();
+        quota.setCount(4);
+        quota.setCountHardLimit(4);
+        quota.setBytes(9L);
+        quota.setByteHardLimit(15L);
+        SetQuotaCommand.createQuota(zk, path, quota);
 
         // see if its set
-        String absolutePath = Quotas.quotaZookeeper + path + "/" + Quotas.limitNode;
+        String absolutePath = Quotas.limitPath(path);
         byte[] data = zk.getData(absolutePath, false, new Stat());
-        StatsTrack st = new StatsTrack(new String(data));
-        assertTrue(st.getBytes() == 5L, "bytes are set");
-        assertTrue(st.getCount() == 10, "num count is set");
+        StatsTrack st = new StatsTrack(data);
+        assertTrue(st.getBytes() == 9L, "bytes are set");
+        assertTrue(st.getByteHardLimit() == 15L, "byte hard limit is set");
+        assertTrue(st.getCount() == 4, "num count is set");
+        assertTrue(st.getCountHardLimit() == 4, "count hard limit is set");
+
+        // check quota node readable by old servers
+        OldStatsTrack ost = new OldStatsTrack(new String(data));
+        assertTrue(ost.getBytes() == 9L, "bytes are set");
+        assertTrue(ost.getCount() == 4, "num count is set");
 
-        String statPath = Quotas.quotaZookeeper + path + "/" + Quotas.statNode;
+        String statPath = Quotas.statPath(path);
         byte[] qdata = zk.getData(statPath, false, new Stat());
-        StatsTrack qst = new StatsTrack(new String(qdata));
+        StatsTrack qst = new StatsTrack(qdata);
         assertTrue(qst.getBytes() == 8L, "bytes are set");
         assertTrue(qst.getCount() == 2, "count is set");
 
@@ -75,7 +113,6 @@ public class ZooKeeperQuotaTest extends ClientBase {
 
     @Test
     public void testSetQuota() throws IOException, InterruptedException, KeeperException, MalformedPathException {
-        final ZooKeeper zk = createClient();
 
         String path = "/c1";
         String nodeData = "foo";
@@ -83,18 +120,21 @@ public class ZooKeeperQuotaTest extends ClientBase {
 
         int count = 10;
         long bytes = 5L;
-        SetQuotaCommand.createQuota(zk, path, bytes, count);
+        StatsTrack quota = new StatsTrack();
+        quota.setCount(count);
+        quota.setBytes(bytes);
+        SetQuotaCommand.createQuota(zk, path, quota);
 
         //check the limit
-        String absoluteLimitPath = Quotas.quotaZookeeper + path + "/" + Quotas.limitNode;
+        String absoluteLimitPath = Quotas.limitPath(path);
         byte[] data = zk.getData(absoluteLimitPath, false, null);
-        StatsTrack st = new StatsTrack(new String(data));
+        StatsTrack st = new StatsTrack(data);
         assertEquals(bytes, st.getBytes());
         assertEquals(count, st.getCount());
         //check the stats
-        String absoluteStatPath = Quotas.quotaZookeeper + path + "/" + Quotas.statNode;
+        String absoluteStatPath = Quotas.statPath(path);
         data = zk.getData(absoluteStatPath, false, null);
-        st = new StatsTrack(new String(data));
+        st = new StatsTrack(data);
         assertEquals(nodeData.length(), st.getBytes());
         assertEquals(1, st.getCount());
 
@@ -103,9 +143,9 @@ public class ZooKeeperQuotaTest extends ClientBase {
         String nodeData2 = "bar";
         zk.create(path2, nodeData2.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
-        absoluteStatPath = Quotas.quotaZookeeper + path + "/" + Quotas.statNode;
+        absoluteStatPath = Quotas.statPath(path);
         data = zk.getData(absoluteStatPath, false, null);
-        st = new StatsTrack(new String(data));
+        st = new StatsTrack(data);
         //check the stats
         assertEquals(nodeData.length() + nodeData2.length(), st.getBytes());
         assertEquals(2, st.getCount());
@@ -113,7 +153,6 @@ public class ZooKeeperQuotaTest extends ClientBase {
 
     @Test
     public void testSetQuotaWhenSetQuotaOnParentOrChildPath() throws IOException, InterruptedException, KeeperException, MalformedPathException {
-        final ZooKeeper zk = createClient();
 
         zk.create("/c1", "some".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         zk.create("/c1/c2", "some".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
@@ -122,19 +161,344 @@ public class ZooKeeperQuotaTest extends ClientBase {
         zk.create("/c1/c2/c3/c4/c5", "some".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
         //set the quota on the path:/c1/c2/c3
-        SetQuotaCommand.createQuota(zk, "/c1/c2/c3", 5L, 10);
+        StatsTrack quota = new StatsTrack();
+        quota.setCount(5);
+        quota.setBytes(10);
+        SetQuotaCommand.createQuota(zk, "/c1/c2/c3", quota);
 
         try {
-            SetQuotaCommand.createQuota(zk, "/c1", 5L, 10);
+            SetQuotaCommand.createQuota(zk, "/c1", quota);
+            fail("should not set quota when child has a quota");
         } catch (IllegalArgumentException e) {
             assertEquals("/c1 has a child /c1/c2/c3 which has a quota", e.getMessage());
         }
 
         try {
-            SetQuotaCommand.createQuota(zk, "/c1/c2/c3/c4/c5", 5L, 10);
+            SetQuotaCommand.createQuota(zk, "/c1/c2/c3/c4/c5", quota);
+            fail("should not set quota when parent has a quota");
         } catch (IllegalArgumentException e) {
             assertEquals("/c1/c2/c3/c4/c5 has a parent /c1/c2/c3 which has a quota", e.getMessage());
         }
     }
 
-}
+    @Test
+    public void testSetQuotaWhenExceedBytesSoftQuota() throws Exception {
+
+        final String path = "/c1";
+        zk.create(path, "data".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        StatsTrack st = new StatsTrack();
+        st.setBytes(5L);
+        SetQuotaCommand.createQuota(zk, path, st);
+
+        zk.setData(path, "12345".getBytes(), -1);
+
+        try {
+            zk.setData(path, "123456".getBytes(), -1);
+        } catch (Exception e) {
+            fail("should set data which exceeds the soft byte quota");
+        }
+    }
+
+    @Test
+    public void testSetQuotaWhenExceedBytesHardQuota() throws Exception {
+
+        final String path = "/c1";
+        zk.create(path, "12345".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        StatsTrack st = new StatsTrack();
+        st.setByteHardLimit(5L);
+        SetQuotaCommand.createQuota(zk, path, st);
+
+        try {
+            zk.setData(path, "123456".getBytes(), -1);
+            fail("should not set data which exceeds the hard byte quota");
+        } catch (QuotaExceededException e) {
+           //expected
+        }
+    }
+
+    @Test
+    public void testSetQuotaWhenExceedBytesHardQuotaExtend() throws Exception {
+
+        String path = "/c0";
+        zk.create(path, "1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        int bytes = 100;
+        StatsTrack st = new StatsTrack();
+        st.setByteHardLimit(bytes);
+        SetQuotaCommand.createQuota(zk, path, st);
+        StringBuilder sb = new StringBuilder(path);
+        for (int i = 1; i <= bytes; i++) {
+            sb.append("/c" + i);
+            if (i == bytes) {
+                try {
+                    zk.create(sb.toString(), "1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                    fail("should not set quota when exceeds hard bytes quota");
+                } catch (QuotaExceededException e) {
+                    //expected
+                }
+            } else {
+                zk.create(sb.toString(), "1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            }
+        }
+    }
+
+    @Test
+    public void testSetQuotaWhenSetQuotaLessThanExistBytes() throws Exception {
+
+        String path = "/c0";
+        zk.create(path, "123456789".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        int bytes = 5;
+        StatsTrack st = new StatsTrack();
+        st.setByteHardLimit(bytes);
+        SetQuotaCommand.createQuota(zk, path, st);
+        try {
+            zk.setData(path, "123456".getBytes(), -1);
+            fail("should not set quota when exceeds hard bytes quota");
+        } catch (QuotaExceededException e) {
+            //expected
+        }
+    }
+
+    @Test
+    public void testSetQuotaWhenSetChildDataExceedBytesQuota() throws Exception {
+
+        final String path = "/test/quota";
+        zk.create("/test", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/test/quota", "01234".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/test/quota/data", "56789".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        StatsTrack quota = new StatsTrack();
+        quota.setByteHardLimit(10);
+        SetQuotaCommand.createQuota(zk, path, quota);
+        try {
+            zk.setData("/test/quota/data", "567891".getBytes(), -1);
+            fail("should not set data when exceed hard byte quota");
+        } catch (QuotaExceededException e) {
+            //expected
+        }
+    }
+
+    @Test
+    public void testSetQuotaWhenCreateNodeExceedBytesQuota() throws Exception {
+
+        final String path = "/test/quota";
+        zk.create("/test", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/test/quota", "01234".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        StatsTrack quota = new StatsTrack();
+        quota.setByteHardLimit(10);
+        SetQuotaCommand.createQuota(zk, path, quota);
+        try {
+            zk.create("/test/quota/data", "567891".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            fail("should not set data when exceed hard byte quota");
+        } catch (QuotaExceededException e) {
+            //expected
+        }
+    }
+
+    @Test
+    public void testSetQuotaWhenExceedCountSoftQuota() throws Exception {
+
+        final String path = "/c1";
+        zk.create(path, "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        int count = 2;
+        StatsTrack st = new StatsTrack();
+        st.setCount(count);
+        SetQuotaCommand.createQuota(zk, path, st);
+        zk.create(path + "/c2", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        try {
+            zk.create(path + "/c2" + "/c3", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        } catch (QuotaExceededException e) {
+            fail("should set quota when exceeds soft count quota");
+        }
+    }
+
+    @Test
+    public void testSetQuotaWhenExceedCountHardQuota() throws Exception {
+
+        final String path = "/c1";
+        zk.create(path, "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        int count = 2;
+        StatsTrack st = new StatsTrack();
+        st.setCountHardLimit(count);
+        SetQuotaCommand.createQuota(zk, path, st);
+        zk.create(path + "/c2", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        try {
+            zk.create(path + "/c2" + "/c3", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            fail("should not set quota when exceeds hard count quota");
+        } catch (QuotaExceededException e) {
+            //expected
+        }
+    }
+
+    @Test
+    public void testSetQuotaWhenExceedCountHardQuotaExtend() throws Exception {
+
+        String path = "/c0";
+        zk.create(path, "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        int count = 100;
+        StatsTrack st = new StatsTrack();
+        st.setCountHardLimit(count);
+        SetQuotaCommand.createQuota(zk, path, st);
+        StringBuilder sb = new StringBuilder(path);
+        for (int i = 1; i <= count; i++) {
+            sb.append("/c" + i);
+            if (i == count) {
+                try {
+                    zk.create(sb.toString() , "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                    fail("should not set quota when exceeds hard count quota");
+                } catch (QuotaExceededException e) {
+                    //expected
+                }
+            } else {
+                zk.create(sb.toString(), "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            }
+        }
+    }
+
+    @Test
+    public void testSetQuotaWhenSetQuotaLessThanExistCount() throws Exception {
+
+        String path = "/c0";
+        zk.create(path, "1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create(path + "/c1", "1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create(path + "/c2", "1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        int count = 2;
+        StatsTrack st = new StatsTrack();
+        st.setCountHardLimit(count);
+        SetQuotaCommand.createQuota(zk, path, st);
+        try {
+            zk.create(path + "/c3", "1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            fail("should not set quota when exceeds hard count quota");
+        } catch (QuotaExceededException e) {
+            //expected
+        }
+    }
+
+    @Test
+    public void testSetQuotaWhenExceedBothBytesAndCountHardQuota() throws Exception {
+
+        final String path = "/c1";
+        zk.create(path, "12345".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        StatsTrack st = new StatsTrack();
+        st.setByteHardLimit(5L);
+        st.setCountHardLimit(1);
+        SetQuotaCommand.createQuota(zk, path, st);
+
+        try {
+            zk.create(path + "/c2", "1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            fail("should give priority to CountQuotaExceededException when both meets the count and bytes quota");
+        } catch (QuotaExceededException e) {
+            //expected
+        }
+    }
+
+    @Test
+    public void testDeleteBytesQuota() throws Exception {
+
+        final String path = "/c1";
+        zk.create(path, "12345".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        StatsTrack st = new StatsTrack();
+        st.setByteHardLimit(5L);
+        SetQuotaCommand.createQuota(zk, path, st);
+
+        try {
+            zk.setData(path, "123456".getBytes(), -1);
+            fail("should not set data which exceeds the hard byte quota");
+        } catch (QuotaExceededException e) {
+            //expected
+        }
+
+        //delete the Byte Hard Quota
+        st = new StatsTrack();
+        st.setByteHardLimit(1);
+        DelQuotaCommand.delQuota(zk, path, st);
+
+        zk.setData(path, "123456".getBytes(), -1);
+    }
+
+    @Test
+    public void testDeleteCountQuota() throws Exception {
+
+        final String path = "/c1";
+        zk.create(path, "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        int count = 2;
+        StatsTrack st = new StatsTrack();
+        st.setCountHardLimit(count);
+        SetQuotaCommand.createQuota(zk, path, st);
+        zk.create(path + "/c2", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        try {
+            zk.create(path + "/c2" + "/c3", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            fail("should not set quota when exceeds hard count quota");
+        } catch (QuotaExceededException e) {
+            //expected
+        }
+
+        //delete the Count Hard Quota
+        st = new StatsTrack();
+        st.setCountHardLimit(1);
+        DelQuotaCommand.delQuota(zk, path, st);
+
+        zk.create(path + "/c2" + "/c3", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    }
+
+    @Test
+    public void testListQuota() throws Exception {
+
+        final String path = "/c1";
+        zk.create(path, "12345".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        StatsTrack st = new StatsTrack();
+        long bytes = 5L;
+        int count = 10;
+        long byteHardLimit = 6L;
+        int countHardLimit = 12;
+        st.setBytes(bytes);
+        st.setCount(count);
+        st.setByteHardLimit(byteHardLimit);
+        st.setCountHardLimit(countHardLimit);
+        SetQuotaCommand.createQuota(zk, path, st);
+
+        List<StatsTrack> statsTracks = ListQuotaCommand.listQuota(zk, path);
+        for (int i = 0; i < statsTracks.size(); i++) {
+            st = statsTracks.get(i);
+            if (i == 0) {
+                assertEquals(count, st.getCount());
+                assertEquals(countHardLimit, st.getCountHardLimit());
+                assertEquals(bytes, st.getBytes());
+                assertEquals(byteHardLimit, st.getByteHardLimit());
+            } else {
+                assertEquals(1, st.getCount());
+                assertEquals(-1, st.getCountHardLimit());
+                assertEquals(5, st.getBytes());
+                assertEquals(-1, st.getByteHardLimit());
+            }
+        }
+        //delete the Byte Hard Quota
+        st = new StatsTrack();
+        st.setByteHardLimit(1);
+        st.setBytes(1);
+        st.setCountHardLimit(1);
+        st.setCount(1);
+        DelQuotaCommand.delQuota(zk, path, st);
+
+        statsTracks = ListQuotaCommand.listQuota(zk, path);
+        for (int i = 0; i < statsTracks.size(); i++) {
+            st = statsTracks.get(i);
+            if (i == 0) {
+                assertEquals(-1, st.getCount());
+                assertEquals(-1, st.getCountHardLimit());
+                assertEquals(-1, st.getBytes());
+                assertEquals(-1, st.getByteHardLimit());
+            } else {
+                assertEquals(1, st.getCount());
+                assertEquals(-1, st.getCountHardLimit());
+                assertEquals(5, st.getBytes());
+                assertEquals(-1, st.getByteHardLimit());
+            }
+        }
+    }
+}
\ No newline at end of file