You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/07/28 17:49:43 UTC

[1/2] incubator-quickstep git commit: Added README for types module. [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/policy-enforcer-dist 96fcc7791 -> fd296cf1b (forced update)


Added README for types module.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/33554c3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/33554c3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/33554c3e

Branch: refs/heads/policy-enforcer-dist
Commit: 33554c3edcac6becb84bfcdcdb8a60b9dd6a3f0b
Parents: 7415ee8
Author: Craig Chasseur <sp...@gmail.com>
Authored: Wed Jul 27 20:01:53 2016 -0700
Committer: Craig Chasseur <sp...@gmail.com>
Committed: Wed Jul 27 20:01:53 2016 -0700

----------------------------------------------------------------------
 types/README.md | 102 +++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 102 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/33554c3e/types/README.md
----------------------------------------------------------------------
diff --git a/types/README.md b/types/README.md
new file mode 100644
index 0000000..baf01aa
--- /dev/null
+++ b/types/README.md
@@ -0,0 +1,102 @@
+# The Quickstep Type System
+
+The types module is used across Quickstep and handles details of how date values
+are stored and represented, how they are parsed from and printed to
+human-readable text, and low-level operations on values that form the building
+blocks for more complex [expressions](../expressions).
+
+## The Type Class
+
+Every distinct concrete type in Quickstep is represented by a single object of
+a class derived from the base `quickstep::Type` class. All types have some
+common properties, including the following:
+
+  * A `TypeID` - an enum identifying the type, e.g. `kInt` for 32-bit integers,
+    or `kVarChar` for variable-length strings.
+  * Nullability - whether the type allows NULL values. All types have both a
+    nullable and a non-nullable flavor, except for NullType, a special type that
+    can ONLY store NULLs and has no non-nullable version.
+  * Storage size - minimum and maximum byte length. For fixed-length types like
+    basic numeric types and fixed length `CHAR(X)` strings, these lengths are
+    the same. For variable-length types like `VARCHAR(X)`, they can be
+    different (and the `Type` class has a method `estimateAverageByteLength()`
+    that can be used to make educated guesses when allocating storage). Note
+    that storage requirements really only apply to uncompressed, non-NULL
+    values. The actual bytes needed to store the values in the
+    [storage system](../storage) may be different if
+    [compression](../compression) is used, and some storage formats might store
+    NULLs differently.
+
+Some categories of types have additional properties (e.g. `CharType` and
+`VarCharType` also have a length parameter that indicates the maximum length of
+string that can be stored).
+
+### Getting a Type
+
+Each distinct, concrete Type is represented by a single object in the entire
+Quickstep process. To actually get a reference to usable `Type`, most code will
+go through the `TypeFactory`. `TypeFactory` provides static methods to access
+specific types by `TypeID` and other parameters. It can also deserialize a type
+from its protobuf representation (a `quickstep::serialization::Type` message).
+Finally, it also provides methods that can determine a `Type` that two different
+types can be cast to.
+
+### More on the `Type` Interface
+
+In addition to methods that allow inspection of a type's properties (e.g. those
+listed above), the Type class defines an interface with useful functionality
+common to all types:
+
+  * Serialization (of the type itself) - the `getProto()` method produces a
+    protobuf message that can be serialized and deserialized and later passed to
+    the TypeFactory to get back the same type.
+  * Relationship to other types - `equals()` determines if two types are exactly
+    the same, while `isCoercibleFrom()` determines if it is possible to convert
+    from another type to a given type (e.g. with a `CAST`), and
+    `isSafelyCoercibleFrom()` determines if such a conversion can always be done
+    without loss of precision.
+  * Printing to human-readable format - `printValueToString()` and
+    `printValueToFile()` can print out values of a type (see `TypedValue` below)
+    in human-readable format.
+  * Parsing from human-readable format - Similarly, `parseValueFromString()`
+    produces a `TypedValue` that is parsed from a string in human-readable
+    format.
+  * Making values - `makeValue()` creates a `TypedValue` from a bare pointer to
+    a value's representation in storage. For nullable types, `makeNullValue()`
+    makes a NULL value, and for numeric types, `makeZeroValue()` makes a zero
+    of that type.
+  * Coercing values - `coerceValue()` takes a value of another type and converts
+    it to the given type (e.g. as part of a `CAST`).
+
+## The TypedValue Class
+
+An individual typed value in Quickstep is represented by an instance of the
+`TypedValue` class. TypedValues can be created by methods of the `Type` class,
+by operation and expression classes that operate on values, or simply by calling
+one of several constructors provided in the class itself for convenience.
+TypedValues have C++ value semantics (i.e. they are copyable, assignable, and
+movable). A TypedValue may own its own data, or it may be a lightweight
+reference to data that is stored elsewhere in memory (this can be checked with
+`isReference()`, and any reference can be upgraded to own its own data copy by
+calling `ensureNotReference()`).
+
+Here are some of the things you can do with a TypedValue:
+
+  * NULL checks - calling `isNull()` determines if the TypedValue represents a
+    NULL. Several methods of TypedValue are usable only for non-NULL values, so
+    it is often important to check this first if in doubt.
+  * Access to underlying data - `getDataPtr()` returns an untyped `void*`
+    pointer to the underlying data, and `getDataSize()` returns the size of the
+    underlying data in bytes. Depending on the type of the value, the templated
+    method `getLiteral()` can be used to get the underlying data as a literal
+    scalar, or `getAsciiStringLength()` can be used to get the string length of
+    a `CHAR(X)` or `VARCHAR(X)` without counting null-terminators.
+  * Hashing - `getHash()` returns a hash of the value, which is suitable for
+    use in the HashTables of the [storage system](../storage), or in generic
+    hash-based C++ containers. `fastEqualCheck()` is provided to quickly check
+    whether two TypedValues of the same type (e.g. in the same hash table) are
+    actually equal.
+  * Serialization/Deserialization - `getProto()` serializes a TypedValue to a
+    `serialization::TypedValue` protobuf. The static method `ProtoIsValid()`
+    checks whether a serialized TypedValue is valid, and
+    `ReconstructFromProto()` rebuilds a TypedValue from its serialized form.


[2/2] incubator-quickstep git commit: Introduced PolicyEnforcer implementation for the distributed version.

Posted by zu...@apache.org.
Introduced PolicyEnforcer implementation for the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/fd296cf1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/fd296cf1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/fd296cf1

Branch: refs/heads/policy-enforcer-dist
Commit: fd296cf1b0d5aad4ae76267cdb61c7a5a10fd7fe
Parents: 33554c3
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Fri Jul 22 11:31:33 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Thu Jul 28 10:49:15 2016 -0700

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                |  26 ++-
 query_execution/PolicyEnforcerBase.cpp        |  23 +++
 query_execution/PolicyEnforcerBase.hpp        |  22 +++
 query_execution/PolicyEnforcerDistributed.cpp | 199 +++++++++++++++++++++
 query_execution/PolicyEnforcerDistributed.hpp | 109 +++++++++++
 query_execution/QueryExecutionMessages.proto  |  15 +-
 query_execution/QueryExecutionTypedefs.hpp    |   7 +-
 query_execution/QueryManagerBase.cpp          |   3 +-
 query_execution/QueryManagerBase.hpp          |  26 ++-
 query_execution/QueryManagerDistributed.cpp   |  21 +++
 query_execution/QueryManagerDistributed.hpp   |   4 +
 query_execution/Shiftboss.cpp                 |  46 ++---
 query_execution/Shiftboss.hpp                 |   4 +-
 query_optimizer/CMakeLists.txt                |   4 +
 query_optimizer/QueryHandle.hpp               |  27 ++-
 15 files changed, 503 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fd296cf1/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 8bf1ab1..f91f1f2 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -35,6 +35,9 @@ endif()
 add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
 add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
 add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp)
+if (ENABLE_DISTRIBUTED)
+  add_library(quickstep_queryexecution_PolicyEnforcerDistributed PolicyEnforcerDistributed.cpp PolicyEnforcerDistributed.hpp)
+endif(ENABLE_DISTRIBUTED)
 add_library(quickstep_queryexecution_PolicyEnforcerSingleNode PolicyEnforcerSingleNode.cpp PolicyEnforcerSingleNode.hpp)
 add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
 add_library(quickstep_queryexecution_QueryContext_proto
@@ -110,6 +113,26 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcerBase
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_Macros
                       tmb)
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_queryexecution_PolicyEnforcerDistributed
+                        glog
+                        quickstep_catalog_CatalogRelation
+                        quickstep_catalog_Catalog_proto
+                        quickstep_queryexecution_PolicyEnforcerBase
+                        quickstep_queryexecution_QueryContext_proto
+                        quickstep_queryexecution_QueryExecutionMessages_proto
+                        quickstep_queryexecution_QueryExecutionState
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_queryexecution_QueryManagerBase
+                        quickstep_queryexecution_QueryManagerDistributed
+                        quickstep_queryexecution_ShiftbossDirectory
+                        quickstep_queryoptimizer_QueryHandle
+                        quickstep_storage_StorageBlockInfo
+                        quickstep_utility_Macros
+                        tmb
+                        ${GFLAGS_LIB_NAME})
+endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryexecution_PolicyEnforcerSingleNode
                       glog
                       quickstep_catalog_CatalogTypedefs
@@ -294,10 +317,11 @@ target_link_libraries(quickstep_queryexecution
 if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryexecution
                         quickstep_queryexecution_BlockLocator
+                        quickstep_queryexecution_PolicyEnforcerDistributed
                         quickstep_queryexecution_QueryManagerDistributed
                         quickstep_queryexecution_Shiftboss
                         quickstep_queryexecution_ShiftbossDirectory)
-endif()
+endif(ENABLE_DISTRIBUTED)
 
 # Tests:
 if (ENABLE_DISTRIBUTED)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fd296cf1/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index d16a502..1c4b621 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -29,6 +29,7 @@
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryExecutionState.hpp"
 #include "query_execution/QueryManagerBase.hpp"
+#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED.
 #include "relational_operators/WorkOrder.hpp"
 #include "storage/StorageBlockInfo.hpp"
 
@@ -60,6 +61,23 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
       admitted_queries_[query_id]->processWorkOrderCompleteMessage(op_index);
       break;
     }
+#ifdef QUICKSTEP_DISTRIBUTED
+    case kInitiateRebuildResponseMessage: {
+      serialization::InitiateRebuildResponseMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+      query_id = proto.query_id();
+      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+      op_index = proto.operator_index();
+      const std::size_t num_rebuild_work_orders = proto.num_rebuild_work_orders();
+
+      // Check if new work orders are available.
+      admitted_queries_[query_id]->processInitiateRebuildResponseMessage(op_index, num_rebuild_work_orders);
+      incrementNumQueuedWorkOrders(proto.shiftboss_index(), num_rebuild_work_orders);
+      break;
+    }
+#endif  // QUICKSTEP_DISTRIBUTED
     case kRebuildWorkOrderCompleteMessage: {
       serialization::RebuildWorkOrderCompletionMessage proto;
       // Note: This proto message contains the time it took to execute the
@@ -131,8 +149,13 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
     default:
       LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
   }
+
   if (admitted_queries_[query_id]->queryStatus(op_index) ==
           QueryManagerBase::QueryStatusCode::kQueryExecuted) {
+#ifdef QUICKSTEP_DISTRIBUTED
+    onQueryCompletion(admitted_queries_[query_id]->query_handle());
+#endif  // QUICKSTEP_DISTRIBUTED
+
     removeQuery(query_id);
     if (!waiting_queries_.empty()) {
       // Admit the earliest waiting query.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fd296cf1/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
index 0482ebc..21cee57 100644
--- a/query_execution/PolicyEnforcerBase.hpp
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -27,6 +27,7 @@
 
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryManagerBase.hpp"
+#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED.
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -69,6 +70,18 @@ class PolicyEnforcerBase {
     }
   }
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  /**
+   * @brief Increment the number of queued workorders for the given worker.
+   *
+   * @param worker_index The logical ID of the given worker.
+   * @param num_new_work_orders The number of the new work orders will be
+   *        executed on Worker indexed by 'worker_index'.
+   **/
+  virtual void incrementNumQueuedWorkOrders(const std::size_t worker_index,
+                                            const std::size_t num_new_work_orders) {}
+#endif  // QUICKSTEP_DISTRIBUTED
+
   /**
    * @brief Admit multiple queries in the system.
    *
@@ -148,6 +161,15 @@ class PolicyEnforcerBase {
   void recordTimeForWorkOrder(
       const serialization::NormalWorkOrderCompletionMessage &proto);
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  /**
+   * @brief Add custom actions upon the completion of a query.
+   *
+   * @param query_handle The query handle.
+   **/
+  virtual void onQueryCompletion(QueryHandle *query_handle) {}
+#endif  // QUICKSTEP_DISTRIBUTED
+
   CatalogDatabaseLite *catalog_database_;
 
   const bool profile_individual_workorders_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fd296cf1/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
new file mode 100644
index 0000000..65a1c82
--- /dev/null
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -0,0 +1,199 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/PolicyEnforcerDistributed.hpp"
+
+#include <cstddef>
+#include <cstdlib>
+#include <memory>
+#include <queue>
+#include <utility>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/Catalog.pb.h"
+#include "catalog/CatalogRelation.hpp"
+#include "query_execution/QueryContext.pb.h"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "query_execution/QueryManagerDistributed.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "storage/StorageBlockInfo.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+using std::size_t;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::TaggedMessage;
+
+namespace quickstep {
+
+namespace S = serialization;
+
+DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that "
+              "can be allocated in a single round of dispatch of messages to "
+              "Shiftbosses.");
+
+void PolicyEnforcerDistributed::getWorkOrderMessages(
+    vector<unique_ptr<S::WorkOrderMessage>> *work_order_messages) {
+  // Iterate over admitted queries until either there are no more
+  // messages available, or the maximum number of messages have
+  // been collected.
+  DCHECK(work_order_messages->empty());
+  // TODO(harshad) - Make this function generic enough so that it
+  // works well when multiple queries are getting executed.
+  if (admitted_queries_.empty()) {
+    LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+    return;
+  }
+
+  const std::size_t per_query_share =
+      FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size();
+  DCHECK_GT(per_query_share, 0u);
+
+  vector<std::size_t> finished_queries_ids;
+
+  for (const auto &admitted_query_info : admitted_queries_) {
+    QueryManagerBase *curr_query_manager = admitted_query_info.second.get();
+    DCHECK(curr_query_manager != nullptr);
+    std::size_t messages_collected_curr_query = 0;
+    while (messages_collected_curr_query < per_query_share) {
+      S::WorkOrderMessage *next_work_order_message =
+          static_cast<QueryManagerDistributed*>(curr_query_manager)->getNextWorkOrderMessage(0);
+      if (next_work_order_message != nullptr) {
+        ++messages_collected_curr_query;
+        work_order_messages->push_back(unique_ptr<S::WorkOrderMessage>(next_work_order_message));
+      } else {
+        // No more work ordes from the current query at this time.
+        // Check if the query's execution is over.
+        if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+          // If the query has been executed, remove it.
+          finished_queries_ids.push_back(admitted_query_info.first);
+        }
+        break;
+      }
+    }
+  }
+  for (const std::size_t finished_qid : finished_queries_ids) {
+    removeQuery(finished_qid);
+  }
+}
+
+bool PolicyEnforcerDistributed::admitQuery(QueryHandle *query_handle) {
+  if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) {
+    // Ok to admit the query.
+    const std::size_t query_id = query_handle->query_id();
+    if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
+      // NOTE(zuyu): Should call before constructing a 'QueryManager'.
+      // Otherwise, an InitiateRebuildMessage may be sent before 'QueryContext'
+      // initializes.
+      initiateQueryInShiftboss(query_handle);
+
+      // Query with the same ID not present, ok to admit.
+      admitted_queries_[query_id].reset(
+          new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_));
+      return true;
+    } else {
+      LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+      return false;
+    }
+  } else {
+    // This query will have to wait.
+    waiting_queries_.push(query_handle);
+    return false;
+  }
+}
+
+void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_handle) {
+  serialization::QueryInitiateMessage proto;
+  proto.set_query_id(query_handle->query_id());
+  proto.mutable_catalog_database_cache()->MergeFrom(query_handle->getCatalogDatabaseCacheProto());
+  proto.mutable_query_context()->MergeFrom(query_handle->getQueryContextProto());
+
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kQueryInitiateMessage);
+  free(proto_bytes);
+
+  LOG(INFO) << "ForemanDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage
+            << "') to Shiftboss 0";
+
+  // TODO(zuyu): Multiple Shiftbosses support.
+  QueryExecutionUtil::SendTMBMessage(bus_,
+                                     foreman_client_id_,
+                                     shiftboss_directory_->getClientId(0),
+                                     move(message));
+
+  // Wait Shiftboss for QueryInitiateResponseMessage.
+  const tmb::AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
+  const TaggedMessage &tagged_message = annotated_message.tagged_message;
+  DCHECK_EQ(kQueryInitiateResponseMessage, tagged_message.message_type());
+  LOG(INFO) << "ForemanDistributed received typed '" << tagged_message.message_type()
+            << "' message from client " << annotated_message.sender;
+
+  serialization::QueryInitiateResponseMessage proto_response;
+  CHECK(proto_response.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+}
+
+void PolicyEnforcerDistributed::onQueryCompletion(QueryHandle *query_handle) {
+  const CatalogRelation *query_result = query_handle->getQueryResultRelation();
+  if (query_result == nullptr) {
+    return;
+  }
+
+  serialization::SaveQueryResultMessage proto;
+  proto.set_relation_id(query_result->getID());
+
+  const vector<block_id> blocks(query_result->getBlocksSnapshot());
+  for (const block_id block : blocks) {
+    proto.add_blocks(block);
+  }
+
+  proto.set_cli_id(query_handle->getClientId());
+
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kSaveQueryResultMessage);
+  free(proto_bytes);
+
+  LOG(INFO) << "ForemanDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage
+            << "') to Shiftboss 0";
+  // TODO(zuyu): Support multiple shiftbosses.
+  QueryExecutionUtil::SendTMBMessage(bus_,
+                                     foreman_client_id_,
+                                     shiftboss_directory_->getClientId(0),
+                                     move(message));
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fd296cf1/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
new file mode 100644
index 0000000..6e7c99a
--- /dev/null
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -0,0 +1,109 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "query_execution/PolicyEnforcerBase.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+
+namespace serialization { class WorkOrderMessage; }
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief A class that ensures that a high level policy is maintained
+ *        in sharing resources among concurrent queries.
+ **/
+class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param foreman_client_id The TMB client ID of the Foreman.
+   * @param catalog_database The CatalogDatabase used.
+   * @param shiftboss_directory The ShiftbossDirectory to use.
+   * @param bus The TMB.
+   * @param profile_individual_workorders If true, profile each normal work order.
+   **/
+  PolicyEnforcerDistributed(const tmb::client_id foreman_client_id,
+                            CatalogDatabaseLite *catalog_database,
+                            ShiftbossDirectory *shiftboss_directory,
+                            tmb::MessageBus *bus,
+                            const bool profile_individual_workorders = false)
+      : PolicyEnforcerBase(catalog_database, profile_individual_workorders),
+        foreman_client_id_(foreman_client_id),
+        shiftboss_directory_(shiftboss_directory),
+        bus_(bus) {}
+
+  /**
+   * @brief Destructor.
+   **/
+  ~PolicyEnforcerDistributed() override {}
+
+  void incrementNumQueuedWorkOrders(const std::size_t worker_index,
+                                    const std::size_t num_new_work_orders) override {
+    shiftboss_directory_->addNumQueuedWorkOrders(worker_index, num_new_work_orders);
+  }
+
+  bool admitQuery(QueryHandle *query_handle) override;
+
+  /**
+   * @brief Get work order messages to be dispatched. These messages come from
+   *        the active queries.
+   *
+   * @param work_order_messages The work order messages to be dispatched.
+   **/
+  void getWorkOrderMessages(
+      std::vector<std::unique_ptr<serialization::WorkOrderMessage>> *work_order_messages);
+
+ private:
+  void decrementNumQueuedWorkOrders(const std::size_t shiftboss_index) override {
+    shiftboss_directory_->decrementNumQueuedWorkOrders(shiftboss_index);
+  }
+
+  void onQueryCompletion(QueryHandle *query_handle) override;
+
+  void initiateQueryInShiftboss(QueryHandle *query_handle);
+
+  const tmb::client_id foreman_client_id_;
+
+  ShiftbossDirectory *shiftboss_directory_;
+
+  tmb::MessageBus *bus_;
+
+  DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerDistributed);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fd296cf1/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 591ca6c..dc1d2df 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -111,14 +111,25 @@ message InitiateRebuildResponseMessage {
   required uint64 query_id = 1;
   required uint64 operator_index = 2;
   required uint64 num_rebuild_work_orders = 3;
+  required uint64 shiftboss_index = 4;
 }
 
-message QueryResultRelationMessage {
+message SaveQueryResultMessage {
   required int32 relation_id = 1;
   repeated fixed64 blocks = 2 [packed=true];
+
+  // Defined in "tmb/id_typedefs.h".
+  required uint32 cli_id = 3;
 }
 
-message QueryResultRelationResponseMessage {
+message SaveQueryResultResponseMessage {
+  required int32 relation_id = 1;
+
+  // Defined in "tmb/id_typedefs.h".
+  required uint32 cli_id = 2;
+}
+
+message QueryResultRelationMessage {
   required int32 relation_id = 1;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fd296cf1/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index d73d4ee..5c52f61 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -81,8 +81,11 @@ enum QueryExecutionMessageType : message_type_id {
   kInitiateRebuildMessage,  // From Foreman to Shiftboss.
   kInitiateRebuildResponseMessage,  // From Shiftboss to Foreman.
 
-  kQueryResultRelationMessage,  // From Foreman to Shiftboss.
-  kQueryResultRelationResponseMessage,  // From Shiftboss to Foreman.
+  kSaveQueryResultMessage,  // From Foreman to Shiftboss.
+  kSaveQueryResultResponseMessage,  // From Shiftboss to Foreman.
+
+  kQueryResultRelationMessage,  // From Foreman to CLI.
+  kQueryResultRelationResponseMessage,  // From CLI to Foreman.
 
   // BlockLocator related messages, sorted in a life cycle of StorageManager
   // with a unique block domain.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fd296cf1/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index d2a3341..4ee51c3 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -35,7 +35,8 @@ using std::pair;
 namespace quickstep {
 
 QueryManagerBase::QueryManagerBase(QueryHandle *query_handle)
-    : query_id_(DCHECK_NOTNULL(query_handle)->query_id()),
+    : query_handle_(query_handle),
+      query_id_(DCHECK_NOTNULL(query_handle)->query_id()),
       query_dag_(DCHECK_NOTNULL(
           DCHECK_NOTNULL(query_handle->getQueryPlanMutable())->getQueryPlanDAGMutable())),
       num_operators_in_dag_(query_dag_->size()),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fd296cf1/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index 6edfd5c..0f9afd6 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -24,6 +24,7 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryExecutionState.hpp"
+#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED.
 #include "relational_operators/RelationalOperator.hpp"
 #include "relational_operators/WorkOrder.hpp"
 #include "storage/StorageBlockInfo.hpp"
@@ -79,6 +80,13 @@ class QueryManagerBase {
   }
 
   /**
+   * @brief Get the query handle.
+   **/
+  inline QueryHandle* query_handle() {
+    return query_handle_;
+  }
+
+  /**
    * @brief Process the received WorkOrder complete message.
    *
    * @param op_index The index of the specified operator node in the query DAG
@@ -128,6 +136,20 @@ class QueryManagerBase {
   void processFeedbackMessage(const dag_node_index op_index,
                               const WorkOrder::FeedbackMessage &message);
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  /**
+   * @brief Process the initiate rebuild work order response message.
+   *
+   * @param shiftboss_index The Shiftboss index for the rebuild work orders.
+   * @param op_index The index of the specified operator node in the query DAG
+   *        for initiating the rebuild work order.
+   * @param num_rebuild_work_orders The number of the rebuild work orders
+   *        generated for the operator indexed by 'op_index'.
+   **/
+  virtual void processInitiateRebuildResponseMessage(const dag_node_index op_index,
+                                                     const std::size_t num_rebuild_work_orders) {}
+#endif  // QUICKSTEP_DISTRIBUTED
+
   /**
    * @brief Get the query status after processing an incoming message.
    *
@@ -250,9 +272,11 @@ class QueryManagerBase {
     return query_exec_state_->hasRebuildInitiated(index);
   }
 
+  QueryHandle *query_handle_;
+
   const std::size_t query_id_;
 
-  DAG<RelationalOperator, bool> *query_dag_;
+  DAG<RelationalOperator, bool> *query_dag_;  // Owned by 'query_handle_'.
   const dag_node_index num_operators_in_dag_;
 
   // For all nodes, store their receiving dependents.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fd296cf1/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index e906fa5..8376b2f 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -119,6 +119,27 @@ bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index)
   return generated_new_workorder_protos;
 }
 
+void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_node_index op_index,
+                                                                    const std::size_t num_rebuild_work_orders) {
+  // TODO(zuyu): Multiple workers support.
+  query_exec_state_->setRebuildStatus(op_index, num_rebuild_work_orders, true);
+
+  if (num_rebuild_work_orders != 0u) {
+    // Wait for the rebuild work orders finish.
+    return;
+  }
+
+  markOperatorFinished(op_index);
+
+  for (const std::pair<dag_node_index, bool> &dependent_link :
+       query_dag_->getDependents(op_index)) {
+    const dag_node_index dependent_op_index = dependent_link.first;
+    if (checkAllBlockingDependenciesMet(dependent_op_index)) {
+      processOperator(dependent_op_index, true);
+    }
+  }
+}
+
 bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
   DCHECK(checkRebuildRequired(index));
   DCHECK(!checkRebuildInitiated(index));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fd296cf1/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 8641c22..131cd86 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -15,6 +15,7 @@
 #ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
 #define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
 
+#include <cstddef>
 #include <memory>
 
 #include "query_execution/QueryExecutionState.hpp"
@@ -60,6 +61,9 @@ class QueryManagerDistributed final : public QueryManagerBase {
 
   bool fetchNormalWorkOrders(const dag_node_index index) override;
 
+  void processInitiateRebuildResponseMessage(const dag_node_index op_index,
+                                             const std::size_t num_rebuild_work_orders) override;
+
  /**
    * @brief Get the next normal workorder to be excuted, wrapped in a
    *        WorkOrderMessage proto.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fd296cf1/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index af56306..6aa48bc 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -149,11 +149,10 @@ void Shiftboss::run() {
                                            move(annotated_message.tagged_message));
         break;
       }
-      case kQueryResultRelationMessage: {
-        // TODO(zuyu): Rename to kSaveQueryResultMessage.
+      case kSaveQueryResultMessage: {
         const TaggedMessage &tagged_message = annotated_message.tagged_message;
 
-        serialization::QueryResultRelationMessage proto;
+        serialization::SaveQueryResultMessage proto;
         CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
 
         for (int i = 0; i < proto.blocks_size(); ++i) {
@@ -168,25 +167,26 @@ void Shiftboss::run() {
           }
         }
 
-        serialization::QueryResultRelationResponseMessage ack_proto;
-        ack_proto.set_relation_id(proto.relation_id());
+        serialization::SaveQueryResultResponseMessage proto_response;
+        proto_response.set_relation_id(proto.relation_id());
+        proto_response.set_cli_id(proto.cli_id());
 
-        const size_t ack_proto_length = ack_proto.ByteSize();
-        char *ack_proto_bytes = static_cast<char*>(malloc(ack_proto_length));
-        CHECK(ack_proto.SerializeToArray(ack_proto_bytes, ack_proto_length));
+        const size_t proto_response_length = proto_response.ByteSize();
+        char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length));
+        CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length));
 
-        TaggedMessage ack_message(static_cast<const void*>(ack_proto_bytes),
-                                  ack_proto_length,
-                                  kQueryResultRelationResponseMessage);
-        free(ack_proto_bytes);
+        TaggedMessage message_response(static_cast<const void*>(proto_response_bytes),
+                                       proto_response_length,
+                                       kSaveQueryResultResponseMessage);
+        free(proto_response_bytes);
 
         LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
-                  << "') sent QueryResultRelationResponseMessage (typed '" << kQueryResultRelationResponseMessage
-                  << ") to Foreman";
+                  << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage
+                  << "') to Foreman";
         QueryExecutionUtil::SendTMBMessage(bus_,
                                            shiftboss_client_id_,
                                            foreman_client_id_,
-                                           move(ack_message));
+                                           move(message_response));
         break;
       }
       case kPoisonMessage: {
@@ -280,15 +280,15 @@ void Shiftboss::processQueryInitiateMessage(
   char *proto_bytes = static_cast<char*>(malloc(proto_length));
   CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
-  TaggedMessage ack_message(static_cast<const void*>(proto_bytes),
-                            proto_length,
-                            kQueryInitiateResponseMessage);
+  TaggedMessage message_response(static_cast<const void*>(proto_bytes),
+                                 proto_length,
+                                 kQueryInitiateResponseMessage);
   free(proto_bytes);
 
   QueryExecutionUtil::SendTMBMessage(bus_,
                                      shiftboss_client_id_,
                                      foreman_client_id_,
-                                     move(ack_message));
+                                     move(message_response));
 }
 
 void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
@@ -317,15 +317,15 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
   char *proto_bytes = static_cast<char*>(malloc(proto_length));
   CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
-  TaggedMessage ack_message(static_cast<const void*>(proto_bytes),
-                            proto_length,
-                            kInitiateRebuildResponseMessage);
+  TaggedMessage message_response(static_cast<const void*>(proto_bytes),
+                                 proto_length,
+                                 kInitiateRebuildResponseMessage);
   free(proto_bytes);
 
   QueryExecutionUtil::SendTMBMessage(bus_,
                                      shiftboss_client_id_,
                                      foreman_client_id_,
-                                     move(ack_message));
+                                     move(message_response));
 
   for (size_t i = 0; i < partially_filled_block_refs.size(); ++i) {
     // NOTE(zuyu): Worker releases the memory after the execution of

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fd296cf1/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 096ab74..9e24d62 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -112,8 +112,8 @@ class Shiftboss : public Thread {
     bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
     bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
 
-    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryResultRelationMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kQueryResultRelationResponseMessage);
+    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage);
+    bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage);
 
     // Stop itself.
     bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fd296cf1/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index a56b714..b6b97a0 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -212,6 +212,10 @@ target_link_libraries(quickstep_queryoptimizer_QueryHandle
                       quickstep_queryexecution_QueryContext_proto
                       quickstep_queryoptimizer_QueryPlan
                       quickstep_utility_Macros)
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_queryoptimizer_QueryHandle
+                        tmb)
+endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryoptimizer_QueryPlan
                       quickstep_relationaloperators_RelationalOperator
                       quickstep_utility_DAG

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fd296cf1/query_optimizer/QueryHandle.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp
index 5f3649a..a2df376 100644
--- a/query_optimizer/QueryHandle.hpp
+++ b/query_optimizer/QueryHandle.hpp
@@ -20,13 +20,17 @@
 #include <cstddef>
 #include <cstdint>
 #include <memory>
-#include <utility>
 
 #include "catalog/Catalog.pb.h"
 #include "query_execution/QueryContext.pb.h"
+#include "query_optimizer/QueryOptimizerConfig.h"  // For QUICKSTEP_DISTRIBUTED.
 #include "query_optimizer/QueryPlan.hpp"
 #include "utility/Macros.hpp"
 
+#ifdef QUICKSTEP_DISTRIBUTED
+#include "tmb/id_typedefs.h"
+#endif  // QUICKSTEP_DISTRIBUTED
+
 namespace quickstep {
 
 class CatalogRelation;
@@ -119,6 +123,22 @@ class QueryHandle {
     query_result_relation_ = relation;
   }
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  /**
+   * @brief Get the client id.
+   */
+  tmb::client_id getClientId() const {
+    return cli_id_;
+  }
+
+  /**
+   * @brief Set the client id.
+   */
+  void setClientId(const tmb::client_id cli_id) {
+    cli_id_ = cli_id;
+  }
+#endif  // QUICKSTEP_DISTRIBUTED
+
  private:
   const std::size_t query_id_;
   const std::uint64_t query_priority_;
@@ -134,6 +154,11 @@ class QueryHandle {
   //             and deleted by the Cli shell.
   const CatalogRelation *query_result_relation_;
 
+#ifdef QUICKSTEP_DISTRIBUTED
+  // The client id of the CLI which sends the query.
+  tmb::client_id cli_id_;
+#endif  // QUICKSTEP_DISTRIBUTED
+
   DISALLOW_COPY_AND_ASSIGN(QueryHandle);
 };