You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/07/22 18:31:53 UTC

incubator-quickstep git commit: Introduced PolicyEnforcer implementation for the distributed version.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/policy-enforcer-dist [created] 03b29a118


Introduced PolicyEnforcer implementation for the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/03b29a11
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/03b29a11
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/03b29a11

Branch: refs/heads/policy-enforcer-dist
Commit: 03b29a118741d0582e49f1bdcfd228048273c62f
Parents: 2b78380
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Fri Jul 22 11:31:33 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Fri Jul 22 11:31:33 2016 -0700

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                |  20 +++-
 query_execution/PolicyEnforcerDistributed.cpp | 112 +++++++++++++++++++++
 query_execution/PolicyEnforcerDistributed.hpp |  96 ++++++++++++++++++
 3 files changed, 227 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/03b29a11/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index f582ba5..500eca3 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -35,6 +35,9 @@ endif()
 add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
 add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
 add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp)
+if (ENABLE_DISTRIBUTED)
+  add_library(quickstep_queryexecution_PolicyEnforcerDistributed PolicyEnforcerDistributed.cpp PolicyEnforcerDistributed.hpp)
+endif(ENABLE_DISTRIBUTED)
 add_library(quickstep_queryexecution_PolicyEnforcerSingleNode PolicyEnforcerSingleNode.cpp PolicyEnforcerSingleNode.hpp)
 add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
 add_library(quickstep_queryexecution_QueryContext_proto
@@ -109,6 +112,20 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcerBase
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_Macros
                       tmb)
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_queryexecution_PolicyEnforcerDistributed
+                        glog
+                        quickstep_queryexecution_PolicyEnforcerBase
+                        quickstep_queryexecution_QueryExecutionMessages_proto
+                        quickstep_queryexecution_QueryExecutionState
+                        quickstep_queryexecution_QueryManagerBase
+                        quickstep_queryexecution_QueryManagerDistributed
+                        quickstep_queryexecution_ShiftbossDirectory
+                        quickstep_queryoptimizer_QueryHandle
+                        quickstep_utility_Macros
+                        tmb
+                        ${GFLAGS_LIB_NAME})
+endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryexecution_PolicyEnforcerSingleNode
                       glog
                       quickstep_catalog_CatalogTypedefs
@@ -271,9 +288,10 @@ target_link_libraries(quickstep_queryexecution
 if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryexecution
                         quickstep_queryexecution_BlockLocator
+                        quickstep_queryexecution_PolicyEnforcerDistributed
                         quickstep_queryexecution_QueryManagerDistributed
                         quickstep_queryexecution_ShiftbossDirectory)
-endif()
+endif(ENABLE_DISTRIBUTED)
 
 # Tests:
 if (ENABLE_DISTRIBUTED)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/03b29a11/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
new file mode 100644
index 0000000..ad65be7
--- /dev/null
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -0,0 +1,112 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/PolicyEnforcerDistributed.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <queue>
+#include <utility>
+#include <unordered_map>
+#include <vector>
+
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "query_execution/QueryManagerDistributed.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+using std::unique_ptr;
+
+namespace quickstep {
+
+namespace S = serialization;
+
+DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
+              " can be allocated in a single round of dispatch of messages to"
+              " the workers.");
+
+void PolicyEnforcerDistributed::getWorkOrderMessages(
+    std::vector<unique_ptr<S::WorkOrderMessage>> *work_order_messages) {
+  // Iterate over admitted queries until either there are no more
+  // messages available, or the maximum number of messages have
+  // been collected.
+  DCHECK(work_order_messages->empty());
+  // TODO(harshad) - Make this function generic enough so that it
+  // works well when multiple queries are getting executed.
+  std::size_t per_query_share = 0;
+  if (!admitted_queries_.empty()) {
+    per_query_share = FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size();
+  } else {
+    LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+    return;
+  }
+  DCHECK_GT(per_query_share, 0u);
+  std::vector<std::size_t> finished_queries_ids;
+
+  for (const auto &admitted_query_info : admitted_queries_) {
+    QueryManagerBase *curr_query_manager = admitted_query_info.second.get();
+    DCHECK(curr_query_manager != nullptr);
+    std::size_t messages_collected_curr_query = 0;
+    while (messages_collected_curr_query < per_query_share) {
+      S::WorkOrderMessage *next_work_order_message =
+          static_cast<QueryManagerDistributed*>(curr_query_manager)->getNextWorkOrderMessage(0);
+      if (next_work_order_message != nullptr) {
+        ++messages_collected_curr_query;
+        work_order_messages->push_back(unique_ptr<S::WorkOrderMessage>(next_work_order_message));
+      } else {
+        // No more work ordes from the current query at this time.
+        // Check if the query's execution is over.
+        if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+          // If the query has been executed, remove it.
+          finished_queries_ids.push_back(admitted_query_info.first);
+        }
+        break;
+      }
+    }
+  }
+  for (const std::size_t finished_qid : finished_queries_ids) {
+    removeQuery(finished_qid);
+  }
+}
+
+bool PolicyEnforcerDistributed::admitQuery(QueryHandle *query_handle) {
+  if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) {
+    // Ok to admit the query.
+    const std::size_t query_id = query_handle->query_id();
+    if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
+      // Query with the same ID not present, ok to admit.
+      admitted_queries_[query_id].reset(
+          new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_));
+      return true;
+    } else {
+      LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+      return false;
+    }
+  } else {
+    // This query will have to wait.
+    waiting_queries_.push(query_handle);
+    return false;
+  }
+}
+
+void PolicyEnforcerDistributed::decrementNumQueuedWorkOrders(const std::size_t worker_index) {
+  shiftboss_directory_->decrementNumQueuedWorkOrders(worker_index);
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/03b29a11/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
new file mode 100644
index 0000000..5bf55a9
--- /dev/null
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -0,0 +1,96 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "query_execution/PolicyEnforcerBase.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+class ShiftbossDirectory;
+
+namespace serialization { class WorkOrderMessage; }
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief A class that ensures that a high level policy is maintained
+ *        in sharing resources among concurrent queries.
+ **/
+class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param foreman_client_id The TMB client ID of the Foreman.
+   * @param catalog_database The CatalogDatabase used.
+   * @param bus The TMB.
+   **/
+  PolicyEnforcerDistributed(const tmb::client_id foreman_client_id,
+                            CatalogDatabaseLite *catalog_database,
+                            ShiftbossDirectory *shiftboss_directory,
+                            tmb::MessageBus *bus,
+                            const bool profile_individual_workorders = false)
+      : PolicyEnforcerBase(catalog_database, profile_individual_workorders),
+        foreman_client_id_(foreman_client_id),
+        shiftboss_directory_(shiftboss_directory),
+        bus_(bus) {}
+
+  /**
+   * @brief Destructor.
+   **/
+  ~PolicyEnforcerDistributed() override {}
+
+  bool admitQuery(QueryHandle *query_handle) override;
+
+  /**
+   * @brief Get work order messages to be dispatched. These messages come from
+   *        the active queries.
+   *
+   * @param work_order_messages The work order messages to be dispatched.
+   **/
+  void getWorkOrderMessages(
+      std::vector<std::unique_ptr<serialization::WorkOrderMessage>> *work_order_messages);
+
+ private:
+  void decrementNumQueuedWorkOrders(const std::size_t worker_index) override;
+
+  const tmb::client_id foreman_client_id_;
+
+  ShiftbossDirectory *shiftboss_directory_;
+
+  tmb::MessageBus *bus_;
+
+  DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerDistributed);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_