You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2014/05/15 03:21:35 UTC

[5/9] git commit: Added internal filter and queueing APIs.

Added internal filter and queueing APIs.

Review: https://reviews.apache.org/r/20781


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c3fe0246
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c3fe0246
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c3fe0246

Branch: refs/heads/master
Commit: c3fe02466fe733c3b7fd99458ef637bf348eac10
Parents: b99402f
Author: Jie Yu <yu...@gmail.com>
Authored: Mon Apr 28 09:44:07 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed May 14 17:38:43 2014 -0700

----------------------------------------------------------------------
 src/Makefile.am                         |  13 +-
 src/linux/routing/filter/action.hpp     |  76 ++++
 src/linux/routing/filter/filter.hpp     | 108 +++++
 src/linux/routing/filter/internal.hpp   | 649 +++++++++++++++++++++++++++
 src/linux/routing/filter/priority.hpp   |  66 +++
 src/linux/routing/queueing/handle.cpp   |  30 ++
 src/linux/routing/queueing/handle.hpp   |  74 +++
 src/linux/routing/queueing/ingress.cpp  | 126 ++++++
 src/linux/routing/queueing/ingress.hpp  |  53 +++
 src/linux/routing/queueing/internal.hpp | 272 +++++++++++
 10 files changed, 1465 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c3fe0246/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index debc77a..42acff1 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -269,13 +269,22 @@ endif
 if WITH_NETWORK_ISOLATOR
   libmesos_no_3rdparty_la_SOURCES +=					\
 	linux/routing/utils.cpp						\
-	linux/routing/link/link.cpp
+	linux/routing/link/link.cpp					\
+	linux/routing/queueing/handle.cpp				\
+	linux/routing/queueing/ingress.cpp
 
   libmesos_no_3rdparty_la_SOURCES +=					\
 	linux/routing/internal.hpp					\
 	linux/routing/utils.hpp						\
+	linux/routing/filter/action.hpp					\
+	linux/routing/filter/filter.hpp					\
+	linux/routing/filter/internal.hpp				\
+	linux/routing/filter/priority.hpp				\
 	linux/routing/link/internal.hpp					\
-	linux/routing/link/link.hpp
+	linux/routing/link/link.hpp					\
+	linux/routing/queueing/handle.hpp				\
+	linux/routing/queueing/ingress.hpp				\
+	linux/routing/queueing/internal.hpp
 endif
 
 libmesos_no_3rdparty_la_SOURCES += common/attributes.hpp		\

http://git-wip-us.apache.org/repos/asf/mesos/blob/c3fe0246/src/linux/routing/filter/action.hpp
----------------------------------------------------------------------
diff --git a/src/linux/routing/filter/action.hpp b/src/linux/routing/filter/action.hpp
new file mode 100644
index 0000000..8cc6594
--- /dev/null
+++ b/src/linux/routing/filter/action.hpp
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LINUX_ROUTING_FILTER_ACTION_HPP__
+#define __LINUX_ROUTING_FILTER_ACTION_HPP__
+
+#include <set>
+#include <string>
+
+namespace routing {
+namespace action {
+
+// Base class for filter actions.
+class Action
+{
+public:
+  virtual ~Action() {}
+
+protected:
+  // Hide the default constructor.
+  Action() {}
+};
+
+
+// Represents an action that redirects a packet to a given link.
+// Currently, kernel only supports redirecting to the egress of a
+// given link.
+class Redirect : public Action
+{
+public:
+  explicit Redirect(const std::string& _link)
+    : link_(_link) {}
+
+  const std::string& link() const { return link_; }
+
+private:
+  // The link to which the packet will be redirected.
+  std::string link_;
+};
+
+
+// Represents an action that mirrors a packet to a set of links.
+// Currently, kernel only supports mirroring to the egress of each
+// link.
+class Mirror : public Action
+{
+public:
+  explicit Mirror(const std::set<std::string>& _links)
+    : links_(_links) {}
+
+  const std::set<std::string>& links() const { return links_; }
+
+private:
+  // The set of links to which the packet will be mirrored.
+  std::set<std::string> links_;
+};
+
+} // namespace action {
+} // namespace routing {
+
+#endif // __LINUX_ROUTING_FILTER_ACTION_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/c3fe0246/src/linux/routing/filter/filter.hpp
----------------------------------------------------------------------
diff --git a/src/linux/routing/filter/filter.hpp b/src/linux/routing/filter/filter.hpp
new file mode 100644
index 0000000..40e21db
--- /dev/null
+++ b/src/linux/routing/filter/filter.hpp
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LINUX_ROUTING_FILTER_FILTER_HPP__
+#define __LINUX_ROUTING_FILTER_FILTER_HPP__
+
+#include <vector>
+
+#include <process/shared.hpp>
+
+#include <stout/option.hpp>
+
+#include "linux/routing/filter/action.hpp"
+#include "linux/routing/filter/priority.hpp"
+
+#include "linux/routing/queueing/handle.hpp"
+
+namespace routing {
+namespace filter {
+
+// Our representation of a filter. Each filter is attached to a
+// 'parent' (either a queueing discipline or queueing class), and
+// contains a 'classifier' which defines how packets will be matched,
+// a 'priority' which defines the order in which filters will be
+// applied, and a series of 'actions' that will be taken when a packet
+// satisfies the conditions specified in the classifier. If the
+// priority is not specified, the kernel will assign a priority to the
+// filter.
+// TODO(jieyu): Currently, this data structure is not directly exposed
+// to the user because libnl does not support getting actions of a
+// filter. Expose this data structure once libnl fixes the issue.
+template <typename Classifier>
+class Filter
+{
+public:
+  // Creates a filter with no action.
+  Filter(const queueing::Handle& _parent,
+         const Classifier& _classifier,
+         const Option<Priority>& _priority)
+    : parent_(_parent),
+      classifier_(_classifier),
+      priority_(_priority) {}
+
+  // TODO(jieyu): Support arbitrary number of actions.
+  template <typename Action>
+  Filter(const queueing::Handle& _parent,
+         const Classifier& _classifier,
+         const Option<Priority>& _priority,
+         const Action& action)
+    : parent_(_parent),
+      classifier_(_classifier),
+      priority_(_priority)
+  {
+    attach(action);
+  }
+
+  // Attaches an action to this filter.
+  template <typename A>
+  void attach(const A& action)
+  {
+    actions_.push_back(process::Shared<action::Action>(new A(action)));
+  }
+
+  const queueing::Handle parent() const { return parent_; }
+  const Classifier& classifier() const { return classifier_; }
+  const Option<Priority>& priority() const { return priority_; }
+
+  // Returns all the actions attached to this filter.
+  const std::vector<process::Shared<action::Action> >& actions() const
+  {
+    return actions_;
+  }
+
+private:
+  // Each filter is attached to a queueing object (either a queueing
+  // discipline or a queueing class).
+  queueing::Handle parent_;
+
+  // The filter specific classifier.
+  Classifier classifier_;
+
+  // The priority of this filter.
+  Option<Priority> priority_;
+
+  // The set of actions attached to this filer. Note that we use
+  // Shared here to make Filter copyable.
+  std::vector<process::Shared<action::Action> > actions_;
+};
+
+} // namespace filter {
+} // namespace routing {
+
+#endif // __LINUX_ROUTING_FILTER_FILTER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/c3fe0246/src/linux/routing/filter/internal.hpp
----------------------------------------------------------------------
diff --git a/src/linux/routing/filter/internal.hpp b/src/linux/routing/filter/internal.hpp
new file mode 100644
index 0000000..cd3279a
--- /dev/null
+++ b/src/linux/routing/filter/internal.hpp
@@ -0,0 +1,649 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LINUX_ROUTING_FILTER_INTERNAL_HPP__
+#define __LINUX_ROUTING_FILTER_INTERNAL_HPP__
+
+#include <stdint.h>
+
+#include <netlink/cache.h>
+#include <netlink/errno.h>
+#include <netlink/object.h>
+#include <netlink/socket.h>
+
+#include <netlink/route/classifier.h>
+#include <netlink/route/link.h>
+#include <netlink/route/tc.h>
+
+#include <netlink/route/act/mirred.h>
+
+#include <netlink/route/cls/basic.h>
+#include <netlink/route/cls/u32.h>
+
+#include <string>
+#include <vector>
+
+#include <process/shared.hpp>
+
+#include <stout/error.hpp>
+#include <stout/foreach.hpp>
+#include <stout/none.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/result.hpp>
+#include <stout/try.hpp>
+
+#include "linux/routing/internal.hpp"
+
+#include "linux/routing/filter/action.hpp"
+#include "linux/routing/filter/filter.hpp"
+#include "linux/routing/filter/priority.hpp"
+
+#include "linux/routing/link/internal.hpp"
+
+#include "linux/routing/queueing/handle.hpp"
+
+namespace routing {
+namespace filter {
+namespace internal {
+
+/////////////////////////////////////////////////
+// Helpers for {en}decoding.
+/////////////////////////////////////////////////
+
+// Forward declaration. Each type of classifier needs to implement
+// this function to encode itself into the libnl filter (rtnl_cls).
+template <typename Classifier>
+Try<Nothing> encode(
+    const Netlink<struct rtnl_cls>& cls,
+    const Classifier& classifier);
+
+
+// Forward declaration. Each type of classifier needs to implement
+// this function to decode itself from the libnl filter (rtnl_cls).
+// Returns None if the libnl filter does not match the type of the
+// classifier.
+template <typename Classifier>
+Result<Classifier> decode(const Netlink<struct rtnl_cls>& cls);
+
+
+// Attaches a redirect action to the libnl filter (rtnl_cls).
+inline Try<Nothing> attach(
+    const Netlink<struct rtnl_cls>& cls,
+    const action::Redirect& redirect)
+{
+  Result<Netlink<struct rtnl_link> > link =
+    link::internal::get(redirect.link());
+
+  if (link.isError()) {
+    return Error(link.error());
+  } else if (link.isNone()) {
+    return Error("Link '" + redirect.link() + "' is not found");
+  }
+
+  // TODO(jieyu): Note that currently, we don't use Netlink for 'act'
+  // because libnl has a refcount issue for rtnl_act. Clean this up
+  // once the bug is fixed in libnl.
+  struct rtnl_act* act = rtnl_act_alloc();
+  if (act == NULL) {
+    return Error("Failed to allocate a libnl action (rtnl_act)");
+  }
+
+  // Set the kind of the action to 'mirred'. The kind 'mirred' stands
+  // for mirror or redirect actions.
+  int err = rtnl_tc_set_kind(TC_CAST(act), "mirred");
+  if (err != 0) {
+    rtnl_act_put(act);
+    return Error(
+        "Failed to set the kind of the action: " +
+        std::string(nl_geterror(err)));
+  }
+
+  rtnl_mirred_set_ifindex(act, rtnl_link_get_ifindex(link.get().get()));
+  rtnl_mirred_set_action(act, TCA_EGRESS_REDIR);
+  rtnl_mirred_set_policy(act, TC_ACT_STOLEN);
+
+  const std::string kind = rtnl_tc_get_kind(TC_CAST(cls.get()));
+  if (kind == "basic") {
+    err = rtnl_basic_add_action(cls.get(), act);
+    if (err != 0) {
+      rtnl_act_put(act);
+      return Error(std::string(nl_geterror(err)));
+    }
+  } else if (kind == "u32") {
+    err = rtnl_u32_add_action(cls.get(), act);
+    if (err != 0) {
+      rtnl_act_put(act);
+      return Error(std::string(nl_geterror(err)));
+    }
+  } else {
+    rtnl_act_put(act);
+    return Error("Unsupported classifier kind: " + kind);
+  }
+
+  return Nothing();
+}
+
+
+// Attaches a mirror action to the libnl filter (rtnl_cls).
+inline Try<Nothing> attach(
+    const Netlink<struct rtnl_cls>& cls,
+    const action::Mirror& mirror)
+{
+  foreach (const std::string& _link, mirror.links()) {
+    Result<Netlink<struct rtnl_link> > link = link::internal::get(_link);
+    if (link.isError()) {
+      return Error(link.error());
+    } else if (link.isNone()) {
+      return Error("Link '" + _link + "' is not found");
+    }
+
+    // TODO(jieyu): Note that currently, we don't use Netlink for
+    // 'act' because libnl has a refcount issue for rtnl_act. Clean
+    // this up once libnl fixes the bug.
+    struct rtnl_act* act = rtnl_act_alloc();
+    if (act == NULL) {
+      return Error("Failed to allocate a libnl action (rtnl_act)");
+    }
+
+    int err = rtnl_tc_set_kind(TC_CAST(act), "mirred");
+    if (err != 0) {
+      rtnl_act_put(act);
+      return Error(
+          "Failed to set the kind of the action: " +
+          std::string(nl_geterror(err)));
+    }
+
+    rtnl_mirred_set_ifindex(act, rtnl_link_get_ifindex(link.get().get()));
+    rtnl_mirred_set_action(act, TCA_EGRESS_MIRROR);
+    rtnl_mirred_set_policy(act, TC_ACT_PIPE);
+
+    const std::string kind = rtnl_tc_get_kind(TC_CAST(cls.get()));
+    if (kind == "basic") {
+      err = rtnl_basic_add_action(cls.get(), act);
+      if (err != 0) {
+        rtnl_act_put(act);
+        return Error(std::string(nl_geterror(err)));
+      }
+    } else if (kind == "u32") {
+      err = rtnl_u32_add_action(cls.get(), act);
+      if (err != 0) {
+        rtnl_act_put(act);
+        return Error(std::string(nl_geterror(err)));
+      }
+    } else {
+      rtnl_act_put(act);
+      return Error("Unsupported classifier kind: " + kind);
+    }
+  }
+
+  return Nothing();
+}
+
+
+// Attaches an action to the libnl filter (rtnl_cls). This function
+// essentially delegates the call to the corresponding attach function
+// depending on the type of the action.
+inline Try<Nothing> attach(
+    const Netlink<struct rtnl_cls>& cls,
+    const process::Shared<action::Action>& action)
+{
+  const action::Redirect* redirect =
+    dynamic_cast<const action::Redirect*>(action.get());
+  if (redirect != NULL) {
+    return attach(cls, *redirect);
+  }
+
+  const action::Mirror* mirror =
+    dynamic_cast<const action::Mirror*>(action.get());
+  if (mirror != NULL) {
+    return attach(cls, *mirror);
+  }
+
+  return Error("Unsupported action type");
+}
+
+
+// Encodes a filter (in our representation) to a libnl filter
+// (rtnl_cls). We use template here so that it works for any type of
+// classifier.
+template <typename Classifier>
+Try<Netlink<struct rtnl_cls> > encodeFilter(
+    const Netlink<struct rtnl_link>& link,
+    const Filter<Classifier>& filter)
+{
+  struct rtnl_cls* c = rtnl_cls_alloc();
+  if (c == NULL) {
+    return Error("Failed to allocate a libnl filter (rtnl_cls)");
+  }
+
+  Netlink<struct rtnl_cls> cls(c);
+
+  rtnl_tc_set_link(TC_CAST(cls.get()), link.get());
+  rtnl_tc_set_parent(TC_CAST(cls.get()), filter.parent().get());
+
+  // Encode the priority.
+  if (filter.priority().isSome()) {
+    rtnl_cls_set_prio(cls.get(), filter.priority().get().get());
+  }
+
+  // Encode the classifier using the classifier specific function.
+  Try<Nothing> encoding = encode(cls, filter.classifier());
+  if (encoding.isError()) {
+    return Error("Failed to encode the classifier " + encoding.error());
+  }
+
+  // Stop the packet from being passed to the next filter if a match
+  // is found. This is only applied to u32 filters (which can match
+  // any 32-bit value in a packet), and is not needed for other types
+  // fo fitlers.
+  // TODO(jieyu): Consider setting flowid.
+  if (rtnl_tc_get_kind(TC_CAST(cls.get())) == std::string("u32")) {
+    int err = rtnl_u32_set_cls_terminal(cls.get());
+    if (err != 0) {
+      return Error(
+          "Failed to mark the libnl filter as a terminal" +
+          std::string(nl_geterror(err)));
+    }
+  }
+
+  // Attach actions to the libnl filter.
+  foreach (const process::Shared<action::Action>& action, filter.actions()) {
+    Try<Nothing> attaching = attach(cls, action);
+    if (attaching.isError()) {
+      return Error("Failed to attach an action " + attaching.error());
+    }
+  }
+
+  return cls;
+}
+
+
+// Decodes a libnl filter (rtnl_cls) to our filter representation.
+// Returns None if the libnl filter does not match the specified
+// classifier type. We use template here so that it works for any type
+// of classifier.
+template <typename Classifier>
+Result<Filter<Classifier> > decodeFilter(const Netlink<struct rtnl_cls>& cls)
+{
+  // If the handle of the libnl filer is 0, it means that it is an
+  // internal filter, therefore is definitly not created by us.
+  if (rtnl_tc_get_handle(TC_CAST(cls.get())) == 0) {
+    return None();
+  }
+
+  // Decode the parent.
+  queueing::Handle parent(rtnl_tc_get_parent(TC_CAST(cls.get())));
+
+  // Decode the priority. If the priority is not specified by the
+  // user, kernel will assign a priority to the filter. So we should
+  // always have a valid priority here.
+  Priority priority(rtnl_cls_get_prio(cls.get()));
+
+  // Decode the classifier using a classifier specific function.
+  Result<Classifier> classifier = decode<Classifier>(cls);
+  if (classifier.isError()) {
+    return Error("Failed to decode the classifier: " + classifier.error());
+  } else if (classifier.isNone()) {
+    return None();
+  }
+
+  // TODO(jieyu): Decode all the actions attached to the filter.
+  // Currently, libnl does not support that (but will support that in
+  // the future).
+  return Filter<Classifier>(parent, classifier.get(), priority);
+}
+
+/////////////////////////////////////////////////
+// Helpers for internal APIs.
+/////////////////////////////////////////////////
+
+// Returns all the libnl filters (rtnl_cls) attached to the given
+// parent on the link.
+inline Try<std::vector<Netlink<struct rtnl_cls> > > getClses(
+    const Netlink<struct rtnl_link>& link,
+    const queueing::Handle& parent)
+{
+  Try<Netlink<struct nl_sock> > sock = routing::socket();
+  if (sock.isError()) {
+    return Error(sock.error());
+  }
+
+  // Dump all the libnl filters (i.e., rtnl_cls) attached to the given
+  // parent on the link.
+  struct nl_cache* c = NULL;
+  int err = rtnl_cls_alloc_cache(
+      sock.get().get(),
+      rtnl_link_get_ifindex(link.get()),
+      parent.get(),
+      &c);
+
+  if (err != 0) {
+    return Error(
+        "Failed to get filter info from kernel: " +
+        std::string(nl_geterror(err)));
+  }
+
+  Netlink<struct nl_cache> cache(c);
+
+  std::vector<Netlink<struct rtnl_cls> > results;
+
+  for (struct nl_object* o = nl_cache_get_first(cache.get());
+       o != NULL; o = nl_cache_get_next(o)) {
+    nl_object_get(o); // Increment the reference counter.
+    results.push_back(Netlink<struct rtnl_cls>((struct rtnl_cls*) o));
+  }
+
+  return results;
+}
+
+
+// Returns the libnl filter (rtnl_cls) attached to the given parent
+// that matches the specified classifier on the link. Returns None if
+// no match has been found. We use template here so that it works for
+// any type of classifier.
+template <typename Classifier>
+Result<Netlink<struct rtnl_cls> > getCls(
+    const Netlink<struct rtnl_link>& link,
+    const queueing::Handle& parent,
+    const Classifier& classifier)
+{
+  Try<std::vector<Netlink<struct rtnl_cls> > > clses = getClses(link, parent);
+  if (clses.isError()) {
+    return Error(clses.error());
+  }
+
+  foreach (const Netlink<struct rtnl_cls>& cls, clses.get()) {
+    // The decode function will return None if 'cls' does not match
+    // the classifier type. In that case, we just move on to the next
+    // libnl filter.
+    Result<Filter<Classifier> > filter = decodeFilter<Classifier>(cls);
+    if (filter.isError()) {
+      return Error("Failed to decode: " + filter.error());
+    } else if (filter.isSome() && filter.get().classifier() == classifier) {
+      return cls;
+    }
+  }
+
+  return None();
+}
+
+/////////////////////////////////////////////////
+// Internal filter APIs.
+/////////////////////////////////////////////////
+
+// Returns true if there exists a filter attached to the given parent
+// that matches the specified classifier on the link. We use template
+// here so that it works for any type of classifier.
+template <typename Classifier>
+Try<bool> exists(
+    const std::string& _link,
+    const queueing::Handle& parent,
+    const Classifier& classifier)
+{
+  Result<Netlink<struct rtnl_link> > link = link::internal::get(_link);
+  if (link.isError()) {
+    return Error(link.error());
+  } else if (link.isNone()) {
+    return false;
+  }
+
+  Result<Netlink<struct rtnl_cls> > cls =
+    getCls(link.get(), parent, classifier);
+
+  if (cls.isError()) {
+    return Error(cls.error());
+  }
+  return cls.isSome();
+}
+
+
+// Creates a new filter on the link. Returns false if a filter
+// attached to the same parent with the same classifier already
+// exists. We use template here so that it works for any type of
+// classifier.
+template <typename Classifier>
+Try<bool> create(const std::string& _link, const Filter<Classifier>& filter)
+{
+  // TODO(jieyu): Currently, we're not able to guarantee the atomicity
+  // between the existence check and the following add operation. So
+  // if two threads try to create the same filter, both of them may
+  // succeed and end up with two filters in the kernel.
+  Try<bool> _exists = exists(_link, filter.parent(), filter.classifier());
+  if (_exists.isError()) {
+    return Error("Check filter existence failed: " + _exists.error());
+  } else if (_exists.get()) {
+    // The filter already exists.
+    return false;
+  }
+
+  Result<Netlink<struct rtnl_link> > link = link::internal::get(_link);
+  if (link.isError()) {
+    return Error(link.error());
+  } else if (link.isNone()) {
+    return Error("Link '" + _link + "' is not found");
+  }
+
+  Try<Netlink<struct rtnl_cls> > cls = encodeFilter(link.get(), filter);
+  if (cls.isError()) {
+    return Error("Failed to encode the filter: " + cls.error());
+  }
+
+  Try<Netlink<struct nl_sock> > sock = routing::socket();
+  if (sock.isError()) {
+    return Error(sock.error());
+  }
+
+  int err = rtnl_cls_add(
+      sock.get().get(),
+      cls.get().get(),
+      NLM_F_CREATE | NLM_F_EXCL);
+
+  if (err != 0) {
+    if (err == -NLE_EXIST) {
+      return false;
+    } else {
+      return Error(std::string(nl_geterror(err)));
+    }
+  }
+
+  return true;
+}
+
+
+// Removes the filter attached to the given parent that matches the
+// specified classifier from the link. Returns false if such a filter
+// is not found. We use template here so that it works for any type of
+// classifier.
+template <typename Classifier>
+Try<bool> remove(
+    const std::string& _link,
+    const queueing::Handle& parent,
+    const Classifier& classifier)
+{
+  Result<Netlink<struct rtnl_link> > link = link::internal::get(_link);
+  if (link.isError()) {
+    return Error(link.error());
+  } else if (link.isNone()) {
+    return false;
+  }
+
+  Result<Netlink<struct rtnl_cls> > cls =
+    getCls(link.get(), parent, classifier);
+
+  if (cls.isError()) {
+    return Error(cls.error());
+  } else if (cls.isNone()) {
+    return false;
+  }
+
+  Try<Netlink<struct nl_sock> > sock = routing::socket();
+  if (sock.isError()) {
+    return Error(sock.error());
+  }
+
+  int err = rtnl_cls_delete(sock.get().get(), cls.get().get(), 0);
+  if (err != 0) {
+    // TODO(jieyu): Interpret the error code and return false if it
+    // indicates that the filter is not found.
+    return Error(std::string(nl_geterror(err)));
+  }
+
+  return true;
+}
+
+
+// Updates the action of the filter attached to the given parent that
+// matches the specified classifier on the link. Returns false if such
+// a filter is not found. We use template here so that it works for
+// any type of classifier.
+template <typename Classifier>
+Try<bool> update(const std::string& _link, const Filter<Classifier>& filter)
+{
+  Result<Netlink<struct rtnl_link> > link = link::internal::get(_link);
+  if (link.isError()) {
+    return Error(link.error());
+  } else if (link.isNone()) {
+    return false;
+  }
+
+  // Get the old libnl classifier (to-be-updated) from kernel.
+  Result<Netlink<struct rtnl_cls> > oldCls =
+    getCls(link.get(), filter.parent(), filter.classifier());
+
+  if (oldCls.isError()) {
+    return Error(oldCls.error());
+  } else if (oldCls.isNone()) {
+    return false;
+  }
+
+  // The kernel does not allow us to update the priority. So if the
+  // user specifies a priority, we will check to make sure they match.
+  if (filter.priority().isSome() &&
+      filter.priority().get().get() != rtnl_cls_get_prio(oldCls.get().get())) {
+    return Error(
+        "The priorities do not match. The old priority is " +
+        stringify(rtnl_cls_get_prio(oldCls.get().get())) +
+        " and the new priority is " +
+        stringify(filter.priority().get().get()));
+  }
+
+  Try<Netlink<struct rtnl_cls> > newCls = encodeFilter(link.get(), filter);
+  if (newCls.isError()) {
+    return Error("Failed to encode the new filter: " + newCls.error());
+  }
+
+  // Set the handle of the new filter to match that of the old one.
+  rtnl_tc_set_handle(
+      TC_CAST(newCls.get().get()),
+      rtnl_tc_get_handle(TC_CAST(oldCls.get().get())));
+
+  // Set the priority of the new filter to match that of the old one.
+  rtnl_cls_set_prio(
+      newCls.get().get(),
+      rtnl_cls_get_prio(oldCls.get().get()));
+
+  Try<Netlink<struct nl_sock> > sock = routing::socket();
+  if (sock.isError()) {
+    return Error(sock.error());
+  }
+
+  int err = rtnl_cls_change(sock.get().get(), newCls.get().get(), 0);
+  if (err != 0) {
+    if (err == -NLE_OBJ_NOTFOUND) {
+      return false;
+    } else {
+      return Error(std::string(nl_geterror(err)));
+    }
+  }
+
+  return true;
+}
+
+
+// Returns all the filters attached to the given parent on the link.
+// Returns None if the link or the parent is not found. We use
+// template here so that it works for any type of classifier.
+template <typename Classifier>
+Result<std::vector<Filter<Classifier> > > filters(
+    const std::string& _link,
+    const queueing::Handle& parent)
+{
+  Result<Netlink<struct rtnl_link> > link = link::internal::get(_link);
+  if (link.isError()) {
+    return Error(link.error());
+  } else if (link.isNone()) {
+    return None();
+  }
+
+  Try<std::vector<Netlink<struct rtnl_cls> > > clses =
+    getClses(link.get(), parent);
+
+  if (clses.isError()) {
+    return Error(clses.error());
+  }
+
+  std::vector<Filter<Classifier> > results;
+
+  foreach (const Netlink<struct rtnl_cls>& cls, clses.get()) {
+    // The decode function will return None if 'cls' does not match
+    // the classifier type. In that case, we just move on to the next
+    // libnl filter.
+    Result<Filter<Classifier> > filter = decodeFilter<Classifier>(cls);
+    if (filter.isError()) {
+      return Error(filter.error());
+    } else if (filter.isSome()) {
+      results.push_back(filter.get());
+    }
+  }
+
+  return results;
+}
+
+
+// Returns all the classifiers attached to the given parent on the
+// link. Returns None if the link or the parent is not found. We use
+// template here so that it works for any type of classifier.
+template <typename Classifier>
+Result<std::vector<Classifier> > classifiers(
+    const std::string& link,
+    const queueing::Handle& parent)
+{
+  Result<std::vector<Filter<Classifier> > > _filters =
+    filters<Classifier>(link, parent);
+
+  if (_filters.isError()) {
+    return Error(_filters.error());
+  } else if (_filters.isNone()) {
+    return None();
+  }
+
+  std::vector<Classifier> results;
+
+  foreach (const Filter<Classifier>& filter, _filters.get()) {
+    results.push_back(filter.classifier());
+  }
+
+  return results;
+}
+
+} // namespace internal {
+} // namespace filter {
+} // namespace routing {
+
+#endif // __LINUX_ROUTING_FILTER_INTERNAL_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/c3fe0246/src/linux/routing/filter/priority.hpp
----------------------------------------------------------------------
diff --git a/src/linux/routing/filter/priority.hpp b/src/linux/routing/filter/priority.hpp
new file mode 100644
index 0000000..699ad17
--- /dev/null
+++ b/src/linux/routing/filter/priority.hpp
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LINUX_ROUTING_FILTER_PRIORITY_HPP__
+#define __LINUX_ROUTING_FILTER_PRIORITY_HPP__
+
+#include <stdint.h>
+
+namespace routing {
+namespace filter {
+
+// Each filter on a link has a priority (a 16-bit integer). The
+// priority defines the order in which filters will be applied to each
+// packet. The lower the number, the higher the priority. In order to
+// deal with filters of different types, we split the priority into
+// two 8-bit parts: a primary part and a secondary part. When
+// comparing two priorities, their primary parts will be compared
+// first. If they have the same primary part, their secondary parts
+// will then be compared. Typically, the primary part is used to
+// define the preference order between different types of filters. For
+// example, a user may want ICMP packet filters to be always applied
+// first than IP packet filters. In that case, the user can assign
+// ICMP packet filters a higher priority in the primary part. If the
+// user wants to further define orders between filters of the same
+// type, they can use the secondary part.
+class Priority
+{
+public:
+  explicit Priority(uint16_t priority)
+  {
+    primary = (uint8_t) (priority >> 8);
+    secondary = (uint8_t) priority;
+  }
+
+  Priority(uint8_t _primary, uint8_t _secondary)
+    : primary(_primary), secondary(_secondary) {}
+
+  uint16_t get() const
+  {
+    return (((uint16_t) primary) << 8) + secondary;
+  }
+
+private:
+  uint8_t primary;
+  uint8_t secondary;
+};
+
+} // namespace filter {
+} // namespace routing {
+
+#endif // __LINUX_ROUTING_FILTER_PRIORITY_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/c3fe0246/src/linux/routing/queueing/handle.cpp
----------------------------------------------------------------------
diff --git a/src/linux/routing/queueing/handle.cpp b/src/linux/routing/queueing/handle.cpp
new file mode 100644
index 0000000..cd34fc4
--- /dev/null
+++ b/src/linux/routing/queueing/handle.cpp
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <netlink/route/tc.h>
+
+#include "linux/routing/queueing/handle.hpp"
+
+namespace routing {
+namespace queueing {
+
+const Handle INGRESS_ROOT = Handle(TC_H_INGRESS);
+const Handle EGRESS_ROOT = Handle(TC_H_ROOT);
+
+} // namespace queueing {
+} // namespace routing {

http://git-wip-us.apache.org/repos/asf/mesos/blob/c3fe0246/src/linux/routing/queueing/handle.hpp
----------------------------------------------------------------------
diff --git a/src/linux/routing/queueing/handle.hpp b/src/linux/routing/queueing/handle.hpp
new file mode 100644
index 0000000..2725d07
--- /dev/null
+++ b/src/linux/routing/queueing/handle.hpp
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LINUX_ROUTING_QUEUEING_HANDLE_HPP__
+#define __LINUX_ROUTING_QUEUEING_HANDLE_HPP__
+
+#include <stdint.h>
+
+namespace routing {
+namespace queueing {
+
+// Represents a handle for a queueing object (either a queueing
+// discipline or a queueing class). It can be specified by combining a
+// primary number and a secondary number (modeled after traffic
+// control object handle used in kernel).
+class Handle
+{
+public:
+  explicit Handle(uint32_t _handle) : handle(_handle) {}
+
+  Handle(uint16_t primary, uint16_t secondary)
+  {
+    handle = (((uint32_t) primary) << 16) + secondary;
+  }
+
+  uint32_t get() const { return handle; }
+
+private:
+  uint32_t handle;
+};
+
+
+// Packets flowing from the device driver to the network stack are
+// called ingress traffic, and packets flowing from the network stack
+// to the device driver are called egress traffic (shown below).
+//
+//        +---------+
+//        | Network |
+//        |  Stack  |
+//        |---------|
+//        |  eth0   |
+//        +---------+
+//           ^   |
+//   Ingress |   | Egress
+//           |   |
+//    -------+   +------>
+
+
+// The parent of the root ingress queueing discipline.
+extern const Handle INGRESS_ROOT;
+
+
+// The parent of the root egress queueing discipline.
+extern const Handle EGRESS_ROOT;
+
+} // namespace queueing {
+} // namespace routing {
+
+#endif // __LINUX_ROUTING_QUEUEING_HANDLE_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/c3fe0246/src/linux/routing/queueing/ingress.cpp
----------------------------------------------------------------------
diff --git a/src/linux/routing/queueing/ingress.cpp b/src/linux/routing/queueing/ingress.cpp
new file mode 100644
index 0000000..e696950
--- /dev/null
+++ b/src/linux/routing/queueing/ingress.cpp
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <netlink/errno.h>
+
+#include <netlink/route/qdisc.h>
+#include <netlink/route/tc.h>
+
+#include <stout/error.hpp>
+#include <stout/none.hpp>
+#include <stout/nothing.hpp>
+#include <stout/result.hpp>
+
+#include "linux/routing/queueing/handle.hpp"
+#include "linux/routing/queueing/ingress.hpp"
+#include "linux/routing/queueing/internal.hpp"
+
+using std::string;
+
+namespace routing {
+namespace queueing {
+
+namespace ingress {
+
+// The ingress queueing discipline is not exposed to the user.
+struct Discipline
+{
+  bool operator == (const Discipline& that) const
+  {
+    return true;
+  }
+};
+
+} // namespace ingress {
+
+/////////////////////////////////////////////////
+// Type specific {en}decoding functions.
+/////////////////////////////////////////////////
+
+namespace internal {
+
+// Encodes an ingress queueing discipline into the libnl queueing
+// discipline 'qdisc'. Each type of queueing discipline needs to
+// implement this function.
+template <>
+Try<Nothing> encode<ingress::Discipline>(
+    const Netlink<struct rtnl_qdisc>& qdisc,
+    const ingress::Discipline& discipline)
+{
+  int err = rtnl_tc_set_kind(TC_CAST(qdisc.get()), "ingress");
+  if (err != 0) {
+    return Error(
+        "Failed to set the kind of the queueing discipline: " +
+        string(nl_geterror(err)));
+  }
+
+  rtnl_tc_set_parent(TC_CAST(qdisc.get()), INGRESS_ROOT.get());
+  rtnl_tc_set_handle(TC_CAST(qdisc.get()), ingress::HANDLE.get());
+
+  return Nothing();
+}
+
+
+// Decodes the ingress queue discipline from the libnl queueing
+// discipline 'qdisc'. Each type of queueing discipline needs to
+// implement this function. Returns None if the libnl queueing
+// discipline is not an ingress queueing discipline.
+template <>
+Result<ingress::Discipline> decode<ingress::Discipline>(
+    const Netlink<struct rtnl_qdisc>& qdisc)
+{
+  if (rtnl_tc_get_kind(TC_CAST(qdisc.get())) != string("ingress") ||
+      rtnl_tc_get_parent(TC_CAST(qdisc.get())) != INGRESS_ROOT.get() ||
+      rtnl_tc_get_handle(TC_CAST(qdisc.get())) != ingress::HANDLE.get()) {
+    return None();
+  }
+
+  return ingress::Discipline();
+}
+
+} // namespace internal {
+
+/////////////////////////////////////////////////
+// Public interfaces.
+/////////////////////////////////////////////////
+
+namespace ingress {
+
+const Handle HANDLE = Handle(0xffff, 0);
+
+
+Try<bool> exists(const string& link)
+{
+  return internal::exists(link, Discipline());
+}
+
+
+Try<bool> create(const string& link)
+{
+  return internal::create(link, Discipline());
+}
+
+
+Try<bool> remove(const string& link)
+{
+  return internal::remove(link, Discipline());
+}
+
+} // namespace ingress {
+} // namespace queueing {
+} // namespace routing {

http://git-wip-us.apache.org/repos/asf/mesos/blob/c3fe0246/src/linux/routing/queueing/ingress.hpp
----------------------------------------------------------------------
diff --git a/src/linux/routing/queueing/ingress.hpp b/src/linux/routing/queueing/ingress.hpp
new file mode 100644
index 0000000..b323a7f
--- /dev/null
+++ b/src/linux/routing/queueing/ingress.hpp
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LINUX_ROUTING_QUEUEING_INGRESS_HPP__
+#define __LINUX_ROUTING_QUEUEING_INGRESS_HPP__
+
+#include <string>
+
+#include <stout/try.hpp>
+
+#include "linux/routing/queueing/handle.hpp"
+
+namespace routing {
+namespace queueing {
+namespace ingress {
+
+// The handle of the ingress queueing discipline is fixed.
+extern const Handle HANDLE;
+
+
+// Returns true if there exists an ingress qdisc on the link.
+Try<bool> exists(const std::string& link);
+
+
+// Creates a new ingress qdisc on the link. Returns false if an
+// ingress qdisc already exists on the link.
+Try<bool> create(const std::string& link);
+
+
+// Removes the ingress qdisc on the link. Return false if the ingress
+// qdisc is not found.
+Try<bool> remove(const std::string& link);
+
+} // namespace ingress {
+} // namespace queueing {
+} // namespace routing {
+
+#endif // __LINUX_ROUTING_QUEUEING_INGRESS_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/c3fe0246/src/linux/routing/queueing/internal.hpp
----------------------------------------------------------------------
diff --git a/src/linux/routing/queueing/internal.hpp b/src/linux/routing/queueing/internal.hpp
new file mode 100644
index 0000000..4d8bd55
--- /dev/null
+++ b/src/linux/routing/queueing/internal.hpp
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LINUX_ROUTING_QUEUEING_INTERNAL_HPP__
+#define __LINUX_ROUTING_QUEUEING_INTERNAL_HPP__
+
+#include <netlink/cache.h>
+#include <netlink/errno.h>
+#include <netlink/object.h>
+#include <netlink/socket.h>
+
+#include <netlink/route/link.h>
+#include <netlink/route/qdisc.h>
+#include <netlink/route/tc.h>
+
+#include <string>
+#include <vector>
+
+#include <stout/error.hpp>
+#include <stout/foreach.hpp>
+#include <stout/none.hpp>
+#include <stout/nothing.hpp>
+#include <stout/result.hpp>
+#include <stout/try.hpp>
+
+#include "linux/routing/internal.hpp"
+
+#include "linux/routing/link/internal.hpp"
+
+#include "linux/routing/queueing/handle.hpp"
+
+namespace routing {
+namespace queueing {
+namespace internal {
+
+/////////////////////////////////////////////////
+// Helpers for {en}decoding.
+/////////////////////////////////////////////////
+
+// Forward declaration. Each type of queueing discipline needs to
+// implement this function to encode itself into the libnl queueing
+// discipline (rtnl_qdisc).
+template <typename Discipline>
+Try<Nothing> encode(
+    const Netlink<struct rtnl_qdisc>& qdisc,
+    const Discipline& discipline);
+
+
+// Forward declaration. Each type of queueing discipline needs to
+// implement this function to decode itself from the libnl queueing
+// discipline (rtnl_qdisc). Returns None if the libnl queueing
+// discipline does not match the specified queueing discipline type.
+template <typename Discipline>
+Result<Discipline> decode(const Netlink<struct rtnl_qdisc>& qdisc);
+
+
+// Encodes a queueing discipline (in our representation) to a libnl
+// queueing discipline (rtnl_qdisc). We use template here so that it
+// works for any type of queueing discipline.
+template <typename Discipline>
+Try<Netlink<struct rtnl_qdisc> > encode(
+    const Netlink<struct rtnl_link>& link,
+    const Discipline& discipline)
+{
+  struct rtnl_qdisc* q = rtnl_qdisc_alloc();
+  if (q == NULL) {
+    return Error("Failed to allocate a libnl qdisc");
+  }
+
+  Netlink<struct rtnl_qdisc> qdisc(q);
+
+  rtnl_tc_set_link(TC_CAST(qdisc.get()), link.get());
+
+  // Perform queue discipline specific encoding.
+  Try<Nothing> encoding = encode(qdisc, discipline);
+  if (encoding.isError()) {
+    return Error(
+        "Failed to encode the queueing discipline: " +
+        encoding.error());
+  }
+
+  return qdisc;
+}
+
+/////////////////////////////////////////////////
+// Helpers for internal APIs.
+/////////////////////////////////////////////////
+
+// Returns all the libnl queue discipline (rtnl_qdisc) on the link.
+inline Try<std::vector<Netlink<struct rtnl_qdisc> > > getQdiscs(
+    const Netlink<struct rtnl_link>& link)
+{
+  Try<Netlink<struct nl_sock> > sock = routing::socket();
+  if (sock.isError()) {
+    return Error(sock.error());
+  }
+
+  // Dump all the queueing discipline from kernel.
+  struct nl_cache* c = NULL;
+  int err = rtnl_qdisc_alloc_cache(sock.get().get(), &c);
+  if (err != 0) {
+    return Error(
+        "Failed to get queueing discipline info from kernel: " +
+        std::string(nl_geterror(err)));
+  }
+
+  Netlink<struct nl_cache> cache(c);
+
+  std::vector<Netlink<struct rtnl_qdisc> > results;
+
+  for (struct nl_object* o = nl_cache_get_first(cache.get());
+       o != NULL; o = nl_cache_get_next(o)) {
+    nl_object_get(o); // Increment the reference counter.
+    results.push_back(Netlink<struct rtnl_qdisc>((struct rtnl_qdisc*) o));
+  }
+
+  return results;
+}
+
+
+// Returns the libnl queueing discipline (rtnl_qdisc) that matches the
+// specified queueing discipline on the link. Return None if no match
+// has been found. We use template here so that it works for any type
+// of queueing discipline.
+template <typename Discipline>
+Result<Netlink<struct rtnl_qdisc> > getQdisc(
+    const Netlink<struct rtnl_link>& link,
+    const Discipline& discipline)
+{
+  Try<std::vector<Netlink<struct rtnl_qdisc> > > qdiscs = getQdiscs(link);
+  if (qdiscs.isError()) {
+    return Error(qdiscs.error());
+  }
+
+  foreach (const Netlink<struct rtnl_qdisc>& qdisc, qdiscs.get()) {
+    // The decode function will return None if 'qdisc' does not match
+    // the specified queueing discipline. In that case, we just move
+    // on to the next libnl queueing discipline.
+    Result<Discipline> result = decode<Discipline>(qdisc);
+    if (result.isError()) {
+      return Error("Failed to decode: " + result.error());
+    } else if (result.isSome() && result.get() == discipline) {
+      return qdisc;
+    }
+  }
+
+  return None();
+}
+
+/////////////////////////////////////////////////
+// Internal queueing APIs.
+/////////////////////////////////////////////////
+
+// Returns true if the specified queueing discipline exists on the
+// link. We use template here so that it works for any type of
+// queueing discipline.
+template <typename Discipline>
+Try<bool> exists(const std::string& _link, const Discipline& discipline)
+{
+  Result<Netlink<struct rtnl_link> > link = link::internal::get(_link);
+  if (link.isError()) {
+    return Error(link.error());
+  } else if (link.isNone()) {
+    return false;
+  }
+
+  Result<Netlink<struct rtnl_qdisc> > qdisc = getQdisc(link.get(), discipline);
+  if (qdisc.isError()) {
+    return Error(qdisc.error());
+  }
+  return qdisc.isSome();
+}
+
+
+// Creates a new queueing discipline on the link. Returns false if the
+// same queueing discipline already exists on the link. We use
+// template here so that it works for any type of queueing discipline.
+template <typename Discipline>
+Try<bool> create(const std::string& _link, const Discipline& discipline)
+{
+  Result<Netlink<struct rtnl_link> > link = link::internal::get(_link);
+  if (link.isError()) {
+    return Error(link.error());
+  } else if (link.isNone()) {
+    return Error("Link '" + _link + "' is not found");
+  }
+
+  Try<Netlink<struct rtnl_qdisc> > qdisc = encode(link.get(), discipline);
+  if (qdisc.isError()) {
+    return Error("Failed to encode the queueing discipline: " + qdisc.error());
+  }
+
+  Try<Netlink<struct nl_sock> > sock = routing::socket();
+  if (sock.isError()) {
+    return Error(sock.error());
+  }
+
+  // The flag NLM_F_EXCL tells libnl that if the qdisc already exists,
+  // this function should return error.
+  int err = rtnl_qdisc_add(
+      sock.get().get(),
+      qdisc.get().get(),
+      NLM_F_CREATE | NLM_F_EXCL);
+
+  if (err != 0) {
+    if (err == -NLE_EXIST) {
+      return false;
+    }
+    return Error(
+        "Failed to add a queueing discipline to the link: " +
+        std::string(nl_geterror(err)));
+  }
+
+  return true;
+}
+
+
+// Removes the specified queueing discipline on the link. Return false
+// if the queueing discipline is not found. We use template here so
+// that it works for any type of queueing discipline.
+template <typename Discipline>
+Try<bool> remove(const std::string& _link, const Discipline& discipline)
+{
+  Result<Netlink<struct rtnl_link> > link = link::internal::get(_link);
+  if (link.isError()) {
+    return Error(link.error());
+  } else if (link.isNone()) {
+    return false;
+  }
+
+  Result<Netlink<struct rtnl_qdisc> > qdisc = getQdisc(link.get(), discipline);
+  if (qdisc.isError()) {
+    return Error(qdisc.error());
+  } else if (qdisc.isNone()) {
+    return false;
+  }
+
+  Try<Netlink<struct nl_sock> > sock = routing::socket();
+  if (sock.isError()) {
+    return Error(sock.error());
+  }
+
+  int err = rtnl_qdisc_delete(sock.get().get(), qdisc.get().get());
+  if (err != 0) {
+    // TODO(jieyu): Interpret the error code and return false if it
+    // indicates that the queueing discipline is not found.
+    return Error(std::string(nl_geterror(err)));
+  }
+
+  return true;
+}
+
+} // namespace internal {
+} // namespace queueing {
+} // namespace routing {
+
+#endif // __LINUX_ROUTING_QUEUEING_INTERNAL_HPP__