You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@quickstep.apache.org by zuyu <gi...@git.apache.org> on 2016/08/14 06:39:27 UTC

[GitHub] incubator-quickstep pull request #93: Added ForemanDistributed.

GitHub user zuyu opened a pull request:

    https://github.com/apache/incubator-quickstep/pull/93

    Added ForemanDistributed.

    Assigned to @hbdeshmukh. Thanks!
    
    This is an initial version that accepts only one `Shiftboss`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/apache/incubator-quickstep dist-foreman

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-quickstep/pull/93.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #93
    
----
commit 626f726aeff45de08226edf3a51fa83bc2884a7f
Author: Zuyu Zhang <zu...@twitter.com>
Date:   2016-08-14T06:37:59Z

    Added ForemanDistributed.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #93: Added ForemanDistributed.

Posted by hbdeshmukh <gi...@git.apache.org>.
Github user hbdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/93#discussion_r74834615
  
    --- Diff: query_execution/CMakeLists.txt ---
    @@ -33,6 +33,9 @@ if (ENABLE_DISTRIBUTED)
       add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
     endif(ENABLE_DISTRIBUTED)
     add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
    +if (ENABLE_DISTRIBUTED)
    +  add_library(quickstep_queryexecution_ForemanDistributed ForemanDistributed.cpp ForemanDistributed.hpp)
    +endif(ENABLE_DISTRIBUTED)
    --- End diff --
    
    Why should we rely on inconsistencies elsewhere in the project to introduce new ones? This is not a supremely important issue so I won't debate any further, however I don't agree with the reasons mentioned above. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #93: Added ForemanDistributed.

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/93#discussion_r74824764
  
    --- Diff: query_execution/ForemanDistributed.hpp ---
    @@ -0,0 +1,130 @@
    +/**
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + **/
    +
    +#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
    +#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
    +
    +#include <cstddef>
    +#include <cstdio>
    +#include <memory>
    +#include <vector>
    +
    +#include "catalog/CatalogTypedefs.hpp"
    +#include "query_execution/ForemanBase.hpp"
    +#include "query_execution/PolicyEnforcerDistributed.hpp"
    +#include "query_execution/ShiftbossDirectory.hpp"
    +#include "utility/Macros.hpp"
    +
    +#include "tmb/id_typedefs.h"
    +
    +namespace tmb { class MessageBus; }
    +
    +namespace quickstep {
    +
    +class CatalogDatabaseLite;
    +
    +namespace serialization { class WorkOrderMessage; }
    +
    +/** \addtogroup QueryExecution
    + *  @{
    + */
    +
    +/**
    + * @brief The Foreman receives queries from the main thread, messages from the
    + *        policy enforcer and dispatches the work to Shiftbosses. It also
    + *        receives work completion messages from Shiftbosses.
    --- End diff --
    
    Yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #93: Added ForemanDistributed.

Posted by hbdeshmukh <gi...@git.apache.org>.
Github user hbdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/93#discussion_r74824057
  
    --- Diff: query_execution/ForemanDistributed.cpp ---
    @@ -0,0 +1,334 @@
    +/**
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + **/
    +
    +#include "query_execution/ForemanDistributed.hpp"
    +
    +#include <cstddef>
    +#include <cstdio>
    +#include <cstdlib>
    +#include <memory>
    +#include <utility>
    +#include <vector>
    +
    +#include "catalog/Catalog.pb.h"
    +#include "catalog/CatalogDatabase.hpp"
    +#include "catalog/CatalogRelation.hpp"
    +#include "catalog/CatalogTypedefs.hpp"
    +#include "query_execution/AdmitRequestMessage.hpp"
    +#include "query_execution/PolicyEnforcerDistributed.hpp"
    +#include "query_execution/QueryExecutionMessages.pb.h"
    +#include "query_execution/QueryExecutionTypedefs.hpp"
    +#include "query_execution/QueryExecutionUtil.hpp"
    +#include "query_execution/ShiftbossDirectory.hpp"
    +#include "threading/ThreadUtil.hpp"
    +#include "utility/EqualsAnyConstant.hpp"
    +
    +#include "glog/logging.h"
    +
    +#include "tmb/address.h"
    +#include "tmb/id_typedefs.h"
    +#include "tmb/message_bus.h"
    +#include "tmb/message_style.h"
    +#include "tmb/tagged_message.h"
    +
    +using std::move;
    +using std::size_t;
    +using std::unique_ptr;
    +using std::vector;
    +
    +using tmb::AnnotatedMessage;
    +using tmb::MessageBus;
    +using tmb::TaggedMessage;
    +using tmb::client_id;
    +
    +namespace quickstep {
    +
    +namespace S = serialization;
    +
    +class QueryHandle;
    +
    +ForemanDistributed::ForemanDistributed(
    +    MessageBus *bus,
    +    CatalogDatabaseLite *catalog_database,
    +    const int cpu_id,
    +    const bool profile_individual_workorders)
    +    : ForemanBase(bus, cpu_id),
    +      catalog_database_(DCHECK_NOTNULL(catalog_database)) {
    +  const std::vector<QueryExecutionMessageType> sender_message_types{
    +      kShiftbossRegistrationResponseMessage,
    +      kQueryInitiateMessage,
    +      kWorkOrderMessage,
    +      kInitiateRebuildMessage,
    +      kQueryTeardownMessage,
    +      kSaveQueryResultMessage,
    +      kQueryExecutionSuccessMessage,
    +      kPoisonMessage};
    +
    +  for (const auto message_type : sender_message_types) {
    +    bus_->RegisterClientAsSender(foreman_client_id_, message_type);
    +  }
    +
    +  const std::vector<QueryExecutionMessageType> receiver_message_types{
    +      kShiftbossRegistrationMessage,
    +      kAdmitRequestMessage,
    +      kQueryInitiateResponseMessage,
    +      kCatalogRelationNewBlockMessage,
    +      kDataPipelineMessage,
    +      kInitiateRebuildResponseMessage,
    +      kWorkOrderCompleteMessage,
    +      kRebuildWorkOrderCompleteMessage,
    +      kWorkOrderFeedbackMessage,
    +      kSaveQueryResultResponseMessage,
    +      kPoisonMessage};
    +
    +  for (const auto message_type : receiver_message_types) {
    +    bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
    +  }
    +
    +  policy_enforcer_.reset(new PolicyEnforcerDistributed(
    +      foreman_client_id_,
    +      catalog_database_,
    +      &shiftboss_directory_,
    +      bus_,
    +      profile_individual_workorders));
    +}
    +
    +void ForemanDistributed::run() {
    +  if (cpu_id_ >= 0) {
    +    // We can pin the foreman thread to a CPU if specified.
    +    ThreadUtil::BindToCPU(cpu_id_);
    +  }
    +
    +  // Ensure that at least one Shiftboss to register.
    +  if (shiftboss_directory_.empty()) {
    +    const AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
    +    const TaggedMessage &tagged_message = annotated_message.tagged_message;
    +    DCHECK_EQ(kShiftbossRegistrationMessage, tagged_message.message_type());
    +    LOG(INFO) << "ForemanDistributed received typed '" << tagged_message.message_type()
    +              << "' message from client " << annotated_message.sender;
    +
    +    S::ShiftbossRegistrationMessage proto;
    +    CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
    +
    +    processShiftbossRegisterationMessage(annotated_message.sender, proto.work_order_capacity());
    +    DCHECK_EQ(1u, shiftboss_directory_.size());
    +  }
    +
    +  // Event loop
    +  for (;;) {
    +    // Receive() causes this thread to sleep until next message is received.
    +    const AnnotatedMessage annotated_message =
    +        bus_->Receive(foreman_client_id_, 0, true);
    +    const TaggedMessage &tagged_message = annotated_message.tagged_message;
    +    const tmb::message_type_id message_type = tagged_message.message_type();
    +    LOG(INFO) << "ForemanDistributed received typed '" << message_type
    +              << "' message from client " << annotated_message.sender;
    +    switch (message_type) {
    +      case kShiftbossRegistrationMessage: {
    +        S::ShiftbossRegistrationMessage proto;
    +        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
    +
    +        processShiftbossRegisterationMessage(annotated_message.sender, proto.work_order_capacity());
    +        break;
    +      }
    +      case kAdmitRequestMessage: {
    +        const AdmitRequestMessage *request_message =
    +            static_cast<const AdmitRequestMessage*>(tagged_message.message());
    +
    +        const vector<QueryHandle *> &query_handles = request_message->getQueryHandles();
    +        DCHECK(!query_handles.empty());
    +
    +        bool all_queries_admitted = true;
    +        if (query_handles.size() == 1u) {
    +          all_queries_admitted =
    +              policy_enforcer_->admitQuery(query_handles.front());
    +        } else {
    +          all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
    +        }
    +        if (!all_queries_admitted) {
    +          LOG(WARNING) << "The scheduler could not admit all the queries";
    +          // TODO(harshad) - Inform the main thread about the failure.
    +        }
    +        break;
    +      }
    +      case kQueryInitiateResponseMessage: {
    +        // TODO(zuyu): check the query id.
    +        break;
    +      }
    +      case kCatalogRelationNewBlockMessage:  // Fall through
    +      case kDataPipelineMessage:
    +      case kRebuildWorkOrderCompleteMessage:
    +      case kWorkOrderCompleteMessage:
    +      case kWorkOrderFeedbackMessage: {
    +        policy_enforcer_->processMessage(tagged_message);
    +        break;
    +      }
    +      case kInitiateRebuildResponseMessage: {
    +        // A unique case in the distributed version.
    +        policy_enforcer_->processInitiateRebuildResponseMessage(tagged_message);
    +        break;
    +      }
    +      case kSaveQueryResultResponseMessage: {
    +        S::SaveQueryResultResponseMessage proto;
    +        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
    +
    +        processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id());
    +        break;
    +      }
    +      case kPoisonMessage: {
    +        if (policy_enforcer_->hasQueries()) {
    +          LOG(WARNING) << "Foreman thread exiting while some queries are "
    +                          "under execution or waiting to be admitted";
    +        }
    +
    +        // Shutdown all Shiftbosses.
    +        tmb::Address shiftboss_addresses;
    +        for (std::size_t i = 0; i < shiftboss_directory_.size(); ++i) {
    +          shiftboss_addresses.AddRecipient(shiftboss_directory_.getClientId(i));
    +        }
    +
    +        tmb::MessageStyle broadcast_style;
    +        broadcast_style.Broadcast(true);
    +
    +        TaggedMessage poison_message(kPoisonMessage);
    +
    +        const MessageBus::SendStatus send_status =
    +            bus_->Send(foreman_client_id_,
    +                       shiftboss_addresses,
    +                       broadcast_style,
    +                       move(poison_message));
    +        DCHECK(send_status == MessageBus::SendStatus::kOK);
    +        return;
    +      }
    +      default:
    +        LOG(FATAL) << "Unknown message type to Foreman";
    +    }
    +
    +    if (canCollectNewMessages(message_type)) {
    +      vector<unique_ptr<S::WorkOrderMessage>> new_messages;
    +      policy_enforcer_->getWorkOrderProtoMessages(&new_messages);
    +      dispatchWorkOrderMessages(new_messages);
    +    }
    +  }
    +}
    +
    +bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id message_type) {
    +  return !QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
    +                                        kCatalogRelationNewBlockMessage,
    +                                        kWorkOrderFeedbackMessage) &&
    +         // TODO(zuyu): Multiple Shiftbosses support.
    +         !shiftboss_directory_.hasReachedCapacity(0);
    --- End diff --
    
    Can we create a static  variable with name something like ``kShiftBossIndex`` to replace the ``0`` used here and elsewhere in the file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #93: Added ForemanDistributed.

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/93#discussion_r74822173
  
    --- Diff: query_execution/CMakeLists.txt ---
    @@ -33,6 +33,9 @@ if (ENABLE_DISTRIBUTED)
       add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
     endif(ENABLE_DISTRIBUTED)
     add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
    +if (ENABLE_DISTRIBUTED)
    +  add_library(quickstep_queryexecution_ForemanDistributed ForemanDistributed.cpp ForemanDistributed.hpp)
    +endif(ENABLE_DISTRIBUTED)
    --- End diff --
    
    This is consistent in almost every, if not all, `CMakeLists.txt`s that `if (xxx)` has a whitespace , while all else, including `elseif(xxx)` and `else(xxx)`, do not have whitespace in between.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #93: Added ForemanDistributed.

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/93#discussion_r74824887
  
    --- Diff: query_execution/ForemanDistributed.cpp ---
    @@ -0,0 +1,334 @@
    +/**
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + **/
    +
    +#include "query_execution/ForemanDistributed.hpp"
    +
    +#include <cstddef>
    +#include <cstdio>
    +#include <cstdlib>
    +#include <memory>
    +#include <utility>
    +#include <vector>
    +
    +#include "catalog/Catalog.pb.h"
    +#include "catalog/CatalogDatabase.hpp"
    +#include "catalog/CatalogRelation.hpp"
    +#include "catalog/CatalogTypedefs.hpp"
    +#include "query_execution/AdmitRequestMessage.hpp"
    +#include "query_execution/PolicyEnforcerDistributed.hpp"
    +#include "query_execution/QueryExecutionMessages.pb.h"
    +#include "query_execution/QueryExecutionTypedefs.hpp"
    +#include "query_execution/QueryExecutionUtil.hpp"
    +#include "query_execution/ShiftbossDirectory.hpp"
    +#include "threading/ThreadUtil.hpp"
    +#include "utility/EqualsAnyConstant.hpp"
    +
    +#include "glog/logging.h"
    +
    +#include "tmb/address.h"
    +#include "tmb/id_typedefs.h"
    +#include "tmb/message_bus.h"
    +#include "tmb/message_style.h"
    +#include "tmb/tagged_message.h"
    +
    +using std::move;
    +using std::size_t;
    +using std::unique_ptr;
    +using std::vector;
    +
    +using tmb::AnnotatedMessage;
    +using tmb::MessageBus;
    +using tmb::TaggedMessage;
    +using tmb::client_id;
    +
    +namespace quickstep {
    +
    +namespace S = serialization;
    +
    +class QueryHandle;
    +
    +ForemanDistributed::ForemanDistributed(
    +    MessageBus *bus,
    +    CatalogDatabaseLite *catalog_database,
    +    const int cpu_id,
    +    const bool profile_individual_workorders)
    +    : ForemanBase(bus, cpu_id),
    +      catalog_database_(DCHECK_NOTNULL(catalog_database)) {
    +  const std::vector<QueryExecutionMessageType> sender_message_types{
    +      kShiftbossRegistrationResponseMessage,
    +      kQueryInitiateMessage,
    +      kWorkOrderMessage,
    +      kInitiateRebuildMessage,
    +      kQueryTeardownMessage,
    +      kSaveQueryResultMessage,
    +      kQueryExecutionSuccessMessage,
    +      kPoisonMessage};
    +
    +  for (const auto message_type : sender_message_types) {
    +    bus_->RegisterClientAsSender(foreman_client_id_, message_type);
    +  }
    +
    +  const std::vector<QueryExecutionMessageType> receiver_message_types{
    +      kShiftbossRegistrationMessage,
    +      kAdmitRequestMessage,
    +      kQueryInitiateResponseMessage,
    +      kCatalogRelationNewBlockMessage,
    +      kDataPipelineMessage,
    +      kInitiateRebuildResponseMessage,
    +      kWorkOrderCompleteMessage,
    +      kRebuildWorkOrderCompleteMessage,
    +      kWorkOrderFeedbackMessage,
    +      kSaveQueryResultResponseMessage,
    +      kPoisonMessage};
    +
    +  for (const auto message_type : receiver_message_types) {
    +    bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
    +  }
    +
    +  policy_enforcer_.reset(new PolicyEnforcerDistributed(
    +      foreman_client_id_,
    +      catalog_database_,
    +      &shiftboss_directory_,
    +      bus_,
    +      profile_individual_workorders));
    +}
    +
    +void ForemanDistributed::run() {
    +  if (cpu_id_ >= 0) {
    +    // We can pin the foreman thread to a CPU if specified.
    +    ThreadUtil::BindToCPU(cpu_id_);
    +  }
    +
    +  // Ensure that at least one Shiftboss to register.
    +  if (shiftboss_directory_.empty()) {
    +    const AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
    +    const TaggedMessage &tagged_message = annotated_message.tagged_message;
    +    DCHECK_EQ(kShiftbossRegistrationMessage, tagged_message.message_type());
    +    LOG(INFO) << "ForemanDistributed received typed '" << tagged_message.message_type()
    +              << "' message from client " << annotated_message.sender;
    +
    +    S::ShiftbossRegistrationMessage proto;
    +    CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
    +
    +    processShiftbossRegisterationMessage(annotated_message.sender, proto.work_order_capacity());
    +    DCHECK_EQ(1u, shiftboss_directory_.size());
    +  }
    +
    +  // Event loop
    +  for (;;) {
    +    // Receive() causes this thread to sleep until next message is received.
    +    const AnnotatedMessage annotated_message =
    +        bus_->Receive(foreman_client_id_, 0, true);
    +    const TaggedMessage &tagged_message = annotated_message.tagged_message;
    +    const tmb::message_type_id message_type = tagged_message.message_type();
    +    LOG(INFO) << "ForemanDistributed received typed '" << message_type
    +              << "' message from client " << annotated_message.sender;
    +    switch (message_type) {
    +      case kShiftbossRegistrationMessage: {
    +        S::ShiftbossRegistrationMessage proto;
    +        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
    +
    +        processShiftbossRegisterationMessage(annotated_message.sender, proto.work_order_capacity());
    +        break;
    +      }
    +      case kAdmitRequestMessage: {
    +        const AdmitRequestMessage *request_message =
    +            static_cast<const AdmitRequestMessage*>(tagged_message.message());
    +
    +        const vector<QueryHandle *> &query_handles = request_message->getQueryHandles();
    +        DCHECK(!query_handles.empty());
    +
    +        bool all_queries_admitted = true;
    +        if (query_handles.size() == 1u) {
    +          all_queries_admitted =
    +              policy_enforcer_->admitQuery(query_handles.front());
    +        } else {
    +          all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
    +        }
    +        if (!all_queries_admitted) {
    +          LOG(WARNING) << "The scheduler could not admit all the queries";
    +          // TODO(harshad) - Inform the main thread about the failure.
    +        }
    +        break;
    +      }
    +      case kQueryInitiateResponseMessage: {
    +        // TODO(zuyu): check the query id.
    +        break;
    +      }
    +      case kCatalogRelationNewBlockMessage:  // Fall through
    +      case kDataPipelineMessage:
    +      case kRebuildWorkOrderCompleteMessage:
    +      case kWorkOrderCompleteMessage:
    +      case kWorkOrderFeedbackMessage: {
    +        policy_enforcer_->processMessage(tagged_message);
    +        break;
    +      }
    +      case kInitiateRebuildResponseMessage: {
    +        // A unique case in the distributed version.
    --- End diff --
    
    It means that this message is unique in the distributed version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #93: Added ForemanDistributed.

Posted by hbdeshmukh <gi...@git.apache.org>.
Github user hbdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/93#discussion_r74823424
  
    --- Diff: query_execution/ForemanDistributed.cpp ---
    @@ -0,0 +1,334 @@
    +/**
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + **/
    +
    +#include "query_execution/ForemanDistributed.hpp"
    +
    +#include <cstddef>
    +#include <cstdio>
    +#include <cstdlib>
    +#include <memory>
    +#include <utility>
    +#include <vector>
    +
    +#include "catalog/Catalog.pb.h"
    +#include "catalog/CatalogDatabase.hpp"
    +#include "catalog/CatalogRelation.hpp"
    +#include "catalog/CatalogTypedefs.hpp"
    +#include "query_execution/AdmitRequestMessage.hpp"
    +#include "query_execution/PolicyEnforcerDistributed.hpp"
    +#include "query_execution/QueryExecutionMessages.pb.h"
    +#include "query_execution/QueryExecutionTypedefs.hpp"
    +#include "query_execution/QueryExecutionUtil.hpp"
    +#include "query_execution/ShiftbossDirectory.hpp"
    +#include "threading/ThreadUtil.hpp"
    +#include "utility/EqualsAnyConstant.hpp"
    +
    +#include "glog/logging.h"
    +
    +#include "tmb/address.h"
    +#include "tmb/id_typedefs.h"
    +#include "tmb/message_bus.h"
    +#include "tmb/message_style.h"
    +#include "tmb/tagged_message.h"
    +
    +using std::move;
    +using std::size_t;
    +using std::unique_ptr;
    +using std::vector;
    +
    +using tmb::AnnotatedMessage;
    +using tmb::MessageBus;
    +using tmb::TaggedMessage;
    +using tmb::client_id;
    +
    +namespace quickstep {
    +
    +namespace S = serialization;
    +
    +class QueryHandle;
    +
    +ForemanDistributed::ForemanDistributed(
    +    MessageBus *bus,
    +    CatalogDatabaseLite *catalog_database,
    +    const int cpu_id,
    +    const bool profile_individual_workorders)
    +    : ForemanBase(bus, cpu_id),
    +      catalog_database_(DCHECK_NOTNULL(catalog_database)) {
    +  const std::vector<QueryExecutionMessageType> sender_message_types{
    +      kShiftbossRegistrationResponseMessage,
    +      kQueryInitiateMessage,
    +      kWorkOrderMessage,
    +      kInitiateRebuildMessage,
    +      kQueryTeardownMessage,
    +      kSaveQueryResultMessage,
    +      kQueryExecutionSuccessMessage,
    +      kPoisonMessage};
    +
    +  for (const auto message_type : sender_message_types) {
    +    bus_->RegisterClientAsSender(foreman_client_id_, message_type);
    +  }
    +
    +  const std::vector<QueryExecutionMessageType> receiver_message_types{
    +      kShiftbossRegistrationMessage,
    +      kAdmitRequestMessage,
    +      kQueryInitiateResponseMessage,
    +      kCatalogRelationNewBlockMessage,
    +      kDataPipelineMessage,
    +      kInitiateRebuildResponseMessage,
    +      kWorkOrderCompleteMessage,
    +      kRebuildWorkOrderCompleteMessage,
    +      kWorkOrderFeedbackMessage,
    +      kSaveQueryResultResponseMessage,
    +      kPoisonMessage};
    +
    +  for (const auto message_type : receiver_message_types) {
    +    bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
    +  }
    +
    +  policy_enforcer_.reset(new PolicyEnforcerDistributed(
    +      foreman_client_id_,
    +      catalog_database_,
    +      &shiftboss_directory_,
    +      bus_,
    +      profile_individual_workorders));
    +}
    +
    +void ForemanDistributed::run() {
    +  if (cpu_id_ >= 0) {
    +    // We can pin the foreman thread to a CPU if specified.
    +    ThreadUtil::BindToCPU(cpu_id_);
    +  }
    +
    +  // Ensure that at least one Shiftboss to register.
    +  if (shiftboss_directory_.empty()) {
    +    const AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
    +    const TaggedMessage &tagged_message = annotated_message.tagged_message;
    +    DCHECK_EQ(kShiftbossRegistrationMessage, tagged_message.message_type());
    +    LOG(INFO) << "ForemanDistributed received typed '" << tagged_message.message_type()
    +              << "' message from client " << annotated_message.sender;
    +
    +    S::ShiftbossRegistrationMessage proto;
    +    CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
    +
    +    processShiftbossRegisterationMessage(annotated_message.sender, proto.work_order_capacity());
    +    DCHECK_EQ(1u, shiftboss_directory_.size());
    +  }
    +
    +  // Event loop
    +  for (;;) {
    +    // Receive() causes this thread to sleep until next message is received.
    +    const AnnotatedMessage annotated_message =
    +        bus_->Receive(foreman_client_id_, 0, true);
    +    const TaggedMessage &tagged_message = annotated_message.tagged_message;
    +    const tmb::message_type_id message_type = tagged_message.message_type();
    +    LOG(INFO) << "ForemanDistributed received typed '" << message_type
    +              << "' message from client " << annotated_message.sender;
    +    switch (message_type) {
    +      case kShiftbossRegistrationMessage: {
    +        S::ShiftbossRegistrationMessage proto;
    +        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
    +
    +        processShiftbossRegisterationMessage(annotated_message.sender, proto.work_order_capacity());
    +        break;
    +      }
    +      case kAdmitRequestMessage: {
    +        const AdmitRequestMessage *request_message =
    +            static_cast<const AdmitRequestMessage*>(tagged_message.message());
    +
    +        const vector<QueryHandle *> &query_handles = request_message->getQueryHandles();
    +        DCHECK(!query_handles.empty());
    +
    +        bool all_queries_admitted = true;
    +        if (query_handles.size() == 1u) {
    +          all_queries_admitted =
    +              policy_enforcer_->admitQuery(query_handles.front());
    +        } else {
    +          all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
    +        }
    +        if (!all_queries_admitted) {
    +          LOG(WARNING) << "The scheduler could not admit all the queries";
    +          // TODO(harshad) - Inform the main thread about the failure.
    +        }
    +        break;
    +      }
    +      case kQueryInitiateResponseMessage: {
    +        // TODO(zuyu): check the query id.
    +        break;
    +      }
    +      case kCatalogRelationNewBlockMessage:  // Fall through
    +      case kDataPipelineMessage:
    +      case kRebuildWorkOrderCompleteMessage:
    +      case kWorkOrderCompleteMessage:
    +      case kWorkOrderFeedbackMessage: {
    +        policy_enforcer_->processMessage(tagged_message);
    +        break;
    +      }
    +      case kInitiateRebuildResponseMessage: {
    +        // A unique case in the distributed version.
    --- End diff --
    
    What's the meaning of the comment: ``A unique case in the distributed version.``? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #93: Added ForemanDistributed.

Posted by hbdeshmukh <gi...@git.apache.org>.
Github user hbdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/93#discussion_r74823840
  
    --- Diff: query_execution/ForemanDistributed.cpp ---
    @@ -0,0 +1,334 @@
    +/**
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + **/
    +
    +#include "query_execution/ForemanDistributed.hpp"
    +
    +#include <cstddef>
    +#include <cstdio>
    +#include <cstdlib>
    +#include <memory>
    +#include <utility>
    +#include <vector>
    +
    +#include "catalog/Catalog.pb.h"
    +#include "catalog/CatalogDatabase.hpp"
    +#include "catalog/CatalogRelation.hpp"
    +#include "catalog/CatalogTypedefs.hpp"
    +#include "query_execution/AdmitRequestMessage.hpp"
    +#include "query_execution/PolicyEnforcerDistributed.hpp"
    +#include "query_execution/QueryExecutionMessages.pb.h"
    +#include "query_execution/QueryExecutionTypedefs.hpp"
    +#include "query_execution/QueryExecutionUtil.hpp"
    +#include "query_execution/ShiftbossDirectory.hpp"
    +#include "threading/ThreadUtil.hpp"
    +#include "utility/EqualsAnyConstant.hpp"
    +
    +#include "glog/logging.h"
    +
    +#include "tmb/address.h"
    +#include "tmb/id_typedefs.h"
    +#include "tmb/message_bus.h"
    +#include "tmb/message_style.h"
    +#include "tmb/tagged_message.h"
    +
    +using std::move;
    +using std::size_t;
    +using std::unique_ptr;
    +using std::vector;
    +
    +using tmb::AnnotatedMessage;
    +using tmb::MessageBus;
    +using tmb::TaggedMessage;
    +using tmb::client_id;
    +
    +namespace quickstep {
    +
    +namespace S = serialization;
    +
    +class QueryHandle;
    +
    +ForemanDistributed::ForemanDistributed(
    +    MessageBus *bus,
    +    CatalogDatabaseLite *catalog_database,
    +    const int cpu_id,
    +    const bool profile_individual_workorders)
    +    : ForemanBase(bus, cpu_id),
    +      catalog_database_(DCHECK_NOTNULL(catalog_database)) {
    +  const std::vector<QueryExecutionMessageType> sender_message_types{
    +      kShiftbossRegistrationResponseMessage,
    +      kQueryInitiateMessage,
    +      kWorkOrderMessage,
    +      kInitiateRebuildMessage,
    +      kQueryTeardownMessage,
    +      kSaveQueryResultMessage,
    +      kQueryExecutionSuccessMessage,
    +      kPoisonMessage};
    +
    +  for (const auto message_type : sender_message_types) {
    +    bus_->RegisterClientAsSender(foreman_client_id_, message_type);
    +  }
    +
    +  const std::vector<QueryExecutionMessageType> receiver_message_types{
    +      kShiftbossRegistrationMessage,
    +      kAdmitRequestMessage,
    +      kQueryInitiateResponseMessage,
    +      kCatalogRelationNewBlockMessage,
    +      kDataPipelineMessage,
    +      kInitiateRebuildResponseMessage,
    +      kWorkOrderCompleteMessage,
    +      kRebuildWorkOrderCompleteMessage,
    +      kWorkOrderFeedbackMessage,
    +      kSaveQueryResultResponseMessage,
    +      kPoisonMessage};
    +
    +  for (const auto message_type : receiver_message_types) {
    +    bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
    +  }
    +
    +  policy_enforcer_.reset(new PolicyEnforcerDistributed(
    +      foreman_client_id_,
    +      catalog_database_,
    +      &shiftboss_directory_,
    +      bus_,
    +      profile_individual_workorders));
    +}
    +
    +void ForemanDistributed::run() {
    +  if (cpu_id_ >= 0) {
    +    // We can pin the foreman thread to a CPU if specified.
    +    ThreadUtil::BindToCPU(cpu_id_);
    +  }
    +
    +  // Ensure that at least one Shiftboss to register.
    +  if (shiftboss_directory_.empty()) {
    +    const AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
    +    const TaggedMessage &tagged_message = annotated_message.tagged_message;
    +    DCHECK_EQ(kShiftbossRegistrationMessage, tagged_message.message_type());
    +    LOG(INFO) << "ForemanDistributed received typed '" << tagged_message.message_type()
    +              << "' message from client " << annotated_message.sender;
    +
    +    S::ShiftbossRegistrationMessage proto;
    +    CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
    +
    +    processShiftbossRegisterationMessage(annotated_message.sender, proto.work_order_capacity());
    +    DCHECK_EQ(1u, shiftboss_directory_.size());
    +  }
    +
    +  // Event loop
    +  for (;;) {
    +    // Receive() causes this thread to sleep until next message is received.
    +    const AnnotatedMessage annotated_message =
    +        bus_->Receive(foreman_client_id_, 0, true);
    +    const TaggedMessage &tagged_message = annotated_message.tagged_message;
    +    const tmb::message_type_id message_type = tagged_message.message_type();
    +    LOG(INFO) << "ForemanDistributed received typed '" << message_type
    +              << "' message from client " << annotated_message.sender;
    +    switch (message_type) {
    +      case kShiftbossRegistrationMessage: {
    +        S::ShiftbossRegistrationMessage proto;
    +        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
    +
    +        processShiftbossRegisterationMessage(annotated_message.sender, proto.work_order_capacity());
    +        break;
    +      }
    +      case kAdmitRequestMessage: {
    +        const AdmitRequestMessage *request_message =
    +            static_cast<const AdmitRequestMessage*>(tagged_message.message());
    +
    +        const vector<QueryHandle *> &query_handles = request_message->getQueryHandles();
    +        DCHECK(!query_handles.empty());
    +
    +        bool all_queries_admitted = true;
    +        if (query_handles.size() == 1u) {
    +          all_queries_admitted =
    +              policy_enforcer_->admitQuery(query_handles.front());
    +        } else {
    +          all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
    +        }
    +        if (!all_queries_admitted) {
    +          LOG(WARNING) << "The scheduler could not admit all the queries";
    +          // TODO(harshad) - Inform the main thread about the failure.
    +        }
    +        break;
    +      }
    +      case kQueryInitiateResponseMessage: {
    +        // TODO(zuyu): check the query id.
    +        break;
    +      }
    +      case kCatalogRelationNewBlockMessage:  // Fall through
    +      case kDataPipelineMessage:
    +      case kRebuildWorkOrderCompleteMessage:
    +      case kWorkOrderCompleteMessage:
    +      case kWorkOrderFeedbackMessage: {
    +        policy_enforcer_->processMessage(tagged_message);
    +        break;
    +      }
    +      case kInitiateRebuildResponseMessage: {
    +        // A unique case in the distributed version.
    +        policy_enforcer_->processInitiateRebuildResponseMessage(tagged_message);
    +        break;
    +      }
    +      case kSaveQueryResultResponseMessage: {
    +        S::SaveQueryResultResponseMessage proto;
    +        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
    +
    +        processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id());
    +        break;
    +      }
    +      case kPoisonMessage: {
    +        if (policy_enforcer_->hasQueries()) {
    +          LOG(WARNING) << "Foreman thread exiting while some queries are "
    +                          "under execution or waiting to be admitted";
    +        }
    +
    +        // Shutdown all Shiftbosses.
    +        tmb::Address shiftboss_addresses;
    +        for (std::size_t i = 0; i < shiftboss_directory_.size(); ++i) {
    +          shiftboss_addresses.AddRecipient(shiftboss_directory_.getClientId(i));
    +        }
    +
    +        tmb::MessageStyle broadcast_style;
    +        broadcast_style.Broadcast(true);
    +
    +        TaggedMessage poison_message(kPoisonMessage);
    +
    +        const MessageBus::SendStatus send_status =
    +            bus_->Send(foreman_client_id_,
    +                       shiftboss_addresses,
    +                       broadcast_style,
    +                       move(poison_message));
    +        DCHECK(send_status == MessageBus::SendStatus::kOK);
    +        return;
    +      }
    +      default:
    +        LOG(FATAL) << "Unknown message type to Foreman";
    --- End diff --
    
    Here and at many other places in this file, can you rename ``Foreman`` to ``ForemanDistributed``? It's probably left over due to copying the function from the Foreman single node file. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep issue #93: Added ForemanDistributed.

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on the issue:

    https://github.com/apache/incubator-quickstep/pull/93
  
    I have addressed the comments. Please take another look. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #93: Added ForemanDistributed.

Posted by hbdeshmukh <gi...@git.apache.org>.
Github user hbdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/93#discussion_r74820761
  
    --- Diff: query_execution/CMakeLists.txt ---
    @@ -33,6 +33,9 @@ if (ENABLE_DISTRIBUTED)
       add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
     endif(ENABLE_DISTRIBUTED)
     add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
    +if (ENABLE_DISTRIBUTED)
    +  add_library(quickstep_queryexecution_ForemanDistributed ForemanDistributed.cpp ForemanDistributed.hpp)
    +endif(ENABLE_DISTRIBUTED)
    --- End diff --
    
    Can we have consistent behavior with the spacing between ``if (ENABLE_DISTRIBUTED)`` and ``endif(ENABLE_DISTRIBUTED)``?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #93: Added ForemanDistributed.

Posted by hbdeshmukh <gi...@git.apache.org>.
Github user hbdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/93#discussion_r74822822
  
    --- Diff: query_execution/ForemanDistributed.hpp ---
    @@ -0,0 +1,130 @@
    +/**
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + **/
    +
    +#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
    +#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
    +
    +#include <cstddef>
    +#include <cstdio>
    +#include <memory>
    +#include <vector>
    +
    +#include "catalog/CatalogTypedefs.hpp"
    +#include "query_execution/ForemanBase.hpp"
    +#include "query_execution/PolicyEnforcerDistributed.hpp"
    +#include "query_execution/ShiftbossDirectory.hpp"
    +#include "utility/Macros.hpp"
    +
    +#include "tmb/id_typedefs.h"
    +
    +namespace tmb { class MessageBus; }
    +
    +namespace quickstep {
    +
    +class CatalogDatabaseLite;
    +
    +namespace serialization { class WorkOrderMessage; }
    +
    +/** \addtogroup QueryExecution
    + *  @{
    + */
    +
    +/**
    + * @brief The Foreman receives queries from the main thread, messages from the
    + *        policy enforcer and dispatches the work to Shiftbosses. It also
    + *        receives work completion messages from Shiftbosses.
    + **/
    +class ForemanDistributed final : public ForemanBase {
    + public:
    +  /**
    +   * @brief Constructor.
    +   *
    +   * @param bus A pointer to the TMB.
    +   * @param catalog_database The catalog database where this query is executed.
    +   * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned.
    +   * @param profile_individual_workorders Whether every workorder's execution
    +   *        be profiled or not.
    +   *
    +   * @note If cpu_id is not specified, Foreman thread can be possibly moved
    +   *       around on different CPUs by the OS.
    +  **/
    +  ForemanDistributed(tmb::MessageBus *bus,
    +                     CatalogDatabaseLite *catalog_database,
    +                     const int cpu_id = -1,
    +                     const bool profile_individual_workorders = false);
    +
    +  ~ForemanDistributed() override {}
    +
    +  /**
    +   * @brief Print the results of profiling individual work orders for a given
    +   *        query.
    +   *
    +   * TODO(harshad) - Add the name of the operator to the output.
    +   *
    +   * @param query_id The ID of the query for which the results are to be printed.
    +   * @param out The file stream.
    +   **/
    +  void printWorkOrderProfilingResults(const std::size_t query_id,
    +                                      std::FILE *out) const;
    +
    + protected:
    +  void run() override;
    +
    + private:
    +  /**
    +   * @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the
    +   *        worker threads.
    +   *
    +   * @param messages The messages to be dispatched.
    +   **/
    +  void dispatchWorkOrderMessages(
    +      const std::vector<std::unique_ptr<serialization::WorkOrderMessage>> &messages);
    +
    +  /**
    +   * @brief Send the given message to the specified worker.
    +   *
    +   * @param worker_index The logical index of the recipient worker in
    +   *        ShiftbossDirectory.
    +   * @param proto The WorkOrderMessage to be sent.
    +   **/
    +  void sendWorkOrderMessage(const std::size_t worker_index,
    +                            const serialization::WorkOrderMessage &proto);
    +
    +  void processShiftbossRegisterationMessage(const tmb::client_id shiftboss_client_id,
    --- End diff --
    
    Grammar nitpick: Registration instead of Registeration. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #93: Added ForemanDistributed.

Posted by hbdeshmukh <gi...@git.apache.org>.
Github user hbdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/93#discussion_r74822612
  
    --- Diff: query_execution/ForemanDistributed.hpp ---
    @@ -0,0 +1,130 @@
    +/**
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + **/
    +
    +#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
    +#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_
    +
    +#include <cstddef>
    +#include <cstdio>
    +#include <memory>
    +#include <vector>
    +
    +#include "catalog/CatalogTypedefs.hpp"
    +#include "query_execution/ForemanBase.hpp"
    +#include "query_execution/PolicyEnforcerDistributed.hpp"
    +#include "query_execution/ShiftbossDirectory.hpp"
    +#include "utility/Macros.hpp"
    +
    +#include "tmb/id_typedefs.h"
    +
    +namespace tmb { class MessageBus; }
    +
    +namespace quickstep {
    +
    +class CatalogDatabaseLite;
    +
    +namespace serialization { class WorkOrderMessage; }
    +
    +/** \addtogroup QueryExecution
    + *  @{
    + */
    +
    +/**
    + * @brief The Foreman receives queries from the main thread, messages from the
    + *        policy enforcer and dispatches the work to Shiftbosses. It also
    + *        receives work completion messages from Shiftbosses.
    --- End diff --
    
    Question: Does the brief description about Foreman class still hold true?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #93: Added ForemanDistributed.

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-quickstep/pull/93


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #93: Added ForemanDistributed.

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/93#discussion_r74825130
  
    --- Diff: query_execution/ForemanDistributed.cpp ---
    @@ -0,0 +1,334 @@
    +/**
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + **/
    +
    +#include "query_execution/ForemanDistributed.hpp"
    +
    +#include <cstddef>
    +#include <cstdio>
    +#include <cstdlib>
    +#include <memory>
    +#include <utility>
    +#include <vector>
    +
    +#include "catalog/Catalog.pb.h"
    +#include "catalog/CatalogDatabase.hpp"
    +#include "catalog/CatalogRelation.hpp"
    +#include "catalog/CatalogTypedefs.hpp"
    +#include "query_execution/AdmitRequestMessage.hpp"
    +#include "query_execution/PolicyEnforcerDistributed.hpp"
    +#include "query_execution/QueryExecutionMessages.pb.h"
    +#include "query_execution/QueryExecutionTypedefs.hpp"
    +#include "query_execution/QueryExecutionUtil.hpp"
    +#include "query_execution/ShiftbossDirectory.hpp"
    +#include "threading/ThreadUtil.hpp"
    +#include "utility/EqualsAnyConstant.hpp"
    +
    +#include "glog/logging.h"
    +
    +#include "tmb/address.h"
    +#include "tmb/id_typedefs.h"
    +#include "tmb/message_bus.h"
    +#include "tmb/message_style.h"
    +#include "tmb/tagged_message.h"
    +
    +using std::move;
    +using std::size_t;
    +using std::unique_ptr;
    +using std::vector;
    +
    +using tmb::AnnotatedMessage;
    +using tmb::MessageBus;
    +using tmb::TaggedMessage;
    +using tmb::client_id;
    +
    +namespace quickstep {
    +
    +namespace S = serialization;
    +
    +class QueryHandle;
    +
    +ForemanDistributed::ForemanDistributed(
    +    MessageBus *bus,
    +    CatalogDatabaseLite *catalog_database,
    +    const int cpu_id,
    +    const bool profile_individual_workorders)
    +    : ForemanBase(bus, cpu_id),
    +      catalog_database_(DCHECK_NOTNULL(catalog_database)) {
    +  const std::vector<QueryExecutionMessageType> sender_message_types{
    +      kShiftbossRegistrationResponseMessage,
    +      kQueryInitiateMessage,
    +      kWorkOrderMessage,
    +      kInitiateRebuildMessage,
    +      kQueryTeardownMessage,
    +      kSaveQueryResultMessage,
    +      kQueryExecutionSuccessMessage,
    +      kPoisonMessage};
    +
    +  for (const auto message_type : sender_message_types) {
    +    bus_->RegisterClientAsSender(foreman_client_id_, message_type);
    +  }
    +
    +  const std::vector<QueryExecutionMessageType> receiver_message_types{
    +      kShiftbossRegistrationMessage,
    +      kAdmitRequestMessage,
    +      kQueryInitiateResponseMessage,
    +      kCatalogRelationNewBlockMessage,
    +      kDataPipelineMessage,
    +      kInitiateRebuildResponseMessage,
    +      kWorkOrderCompleteMessage,
    +      kRebuildWorkOrderCompleteMessage,
    +      kWorkOrderFeedbackMessage,
    +      kSaveQueryResultResponseMessage,
    +      kPoisonMessage};
    +
    +  for (const auto message_type : receiver_message_types) {
    +    bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
    +  }
    +
    +  policy_enforcer_.reset(new PolicyEnforcerDistributed(
    +      foreman_client_id_,
    +      catalog_database_,
    +      &shiftboss_directory_,
    +      bus_,
    +      profile_individual_workorders));
    +}
    +
    +void ForemanDistributed::run() {
    +  if (cpu_id_ >= 0) {
    +    // We can pin the foreman thread to a CPU if specified.
    +    ThreadUtil::BindToCPU(cpu_id_);
    +  }
    +
    +  // Ensure that at least one Shiftboss to register.
    +  if (shiftboss_directory_.empty()) {
    +    const AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
    +    const TaggedMessage &tagged_message = annotated_message.tagged_message;
    +    DCHECK_EQ(kShiftbossRegistrationMessage, tagged_message.message_type());
    +    LOG(INFO) << "ForemanDistributed received typed '" << tagged_message.message_type()
    +              << "' message from client " << annotated_message.sender;
    +
    +    S::ShiftbossRegistrationMessage proto;
    +    CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
    +
    +    processShiftbossRegisterationMessage(annotated_message.sender, proto.work_order_capacity());
    +    DCHECK_EQ(1u, shiftboss_directory_.size());
    +  }
    +
    +  // Event loop
    +  for (;;) {
    +    // Receive() causes this thread to sleep until next message is received.
    +    const AnnotatedMessage annotated_message =
    +        bus_->Receive(foreman_client_id_, 0, true);
    +    const TaggedMessage &tagged_message = annotated_message.tagged_message;
    +    const tmb::message_type_id message_type = tagged_message.message_type();
    +    LOG(INFO) << "ForemanDistributed received typed '" << message_type
    +              << "' message from client " << annotated_message.sender;
    +    switch (message_type) {
    +      case kShiftbossRegistrationMessage: {
    +        S::ShiftbossRegistrationMessage proto;
    +        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
    +
    +        processShiftbossRegisterationMessage(annotated_message.sender, proto.work_order_capacity());
    +        break;
    +      }
    +      case kAdmitRequestMessage: {
    +        const AdmitRequestMessage *request_message =
    +            static_cast<const AdmitRequestMessage*>(tagged_message.message());
    +
    +        const vector<QueryHandle *> &query_handles = request_message->getQueryHandles();
    +        DCHECK(!query_handles.empty());
    +
    +        bool all_queries_admitted = true;
    +        if (query_handles.size() == 1u) {
    +          all_queries_admitted =
    +              policy_enforcer_->admitQuery(query_handles.front());
    +        } else {
    +          all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
    +        }
    +        if (!all_queries_admitted) {
    +          LOG(WARNING) << "The scheduler could not admit all the queries";
    +          // TODO(harshad) - Inform the main thread about the failure.
    +        }
    +        break;
    +      }
    +      case kQueryInitiateResponseMessage: {
    +        // TODO(zuyu): check the query id.
    +        break;
    +      }
    +      case kCatalogRelationNewBlockMessage:  // Fall through
    +      case kDataPipelineMessage:
    +      case kRebuildWorkOrderCompleteMessage:
    +      case kWorkOrderCompleteMessage:
    +      case kWorkOrderFeedbackMessage: {
    +        policy_enforcer_->processMessage(tagged_message);
    +        break;
    +      }
    +      case kInitiateRebuildResponseMessage: {
    +        // A unique case in the distributed version.
    +        policy_enforcer_->processInitiateRebuildResponseMessage(tagged_message);
    +        break;
    +      }
    +      case kSaveQueryResultResponseMessage: {
    +        S::SaveQueryResultResponseMessage proto;
    +        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
    +
    +        processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id());
    +        break;
    +      }
    +      case kPoisonMessage: {
    +        if (policy_enforcer_->hasQueries()) {
    +          LOG(WARNING) << "Foreman thread exiting while some queries are "
    +                          "under execution or waiting to be admitted";
    +        }
    +
    +        // Shutdown all Shiftbosses.
    +        tmb::Address shiftboss_addresses;
    +        for (std::size_t i = 0; i < shiftboss_directory_.size(); ++i) {
    +          shiftboss_addresses.AddRecipient(shiftboss_directory_.getClientId(i));
    +        }
    +
    +        tmb::MessageStyle broadcast_style;
    +        broadcast_style.Broadcast(true);
    +
    +        TaggedMessage poison_message(kPoisonMessage);
    +
    +        const MessageBus::SendStatus send_status =
    +            bus_->Send(foreman_client_id_,
    +                       shiftboss_addresses,
    +                       broadcast_style,
    +                       move(poison_message));
    +        DCHECK(send_status == MessageBus::SendStatus::kOK);
    +        return;
    +      }
    +      default:
    +        LOG(FATAL) << "Unknown message type to Foreman";
    +    }
    +
    +    if (canCollectNewMessages(message_type)) {
    +      vector<unique_ptr<S::WorkOrderMessage>> new_messages;
    +      policy_enforcer_->getWorkOrderProtoMessages(&new_messages);
    +      dispatchWorkOrderMessages(new_messages);
    +    }
    +  }
    +}
    +
    +bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id message_type) {
    +  return !QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
    +                                        kCatalogRelationNewBlockMessage,
    +                                        kWorkOrderFeedbackMessage) &&
    +         // TODO(zuyu): Multiple Shiftbosses support.
    +         !shiftboss_directory_.hasReachedCapacity(0);
    --- End diff --
    
    No, alternatively once we have multiple shiftbosses support, it will be replaced with some variable called `shiftboss_index`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #93: Added ForemanDistributed.

Posted by hbdeshmukh <gi...@git.apache.org>.
Github user hbdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/93#discussion_r74823182
  
    --- Diff: query_execution/ForemanDistributed.cpp ---
    @@ -0,0 +1,334 @@
    +/**
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + **/
    +
    +#include "query_execution/ForemanDistributed.hpp"
    +
    +#include <cstddef>
    +#include <cstdio>
    +#include <cstdlib>
    +#include <memory>
    +#include <utility>
    +#include <vector>
    +
    +#include "catalog/Catalog.pb.h"
    +#include "catalog/CatalogDatabase.hpp"
    +#include "catalog/CatalogRelation.hpp"
    +#include "catalog/CatalogTypedefs.hpp"
    +#include "query_execution/AdmitRequestMessage.hpp"
    +#include "query_execution/PolicyEnforcerDistributed.hpp"
    +#include "query_execution/QueryExecutionMessages.pb.h"
    +#include "query_execution/QueryExecutionTypedefs.hpp"
    +#include "query_execution/QueryExecutionUtil.hpp"
    +#include "query_execution/ShiftbossDirectory.hpp"
    +#include "threading/ThreadUtil.hpp"
    +#include "utility/EqualsAnyConstant.hpp"
    +
    +#include "glog/logging.h"
    +
    +#include "tmb/address.h"
    +#include "tmb/id_typedefs.h"
    +#include "tmb/message_bus.h"
    +#include "tmb/message_style.h"
    +#include "tmb/tagged_message.h"
    +
    +using std::move;
    +using std::size_t;
    +using std::unique_ptr;
    +using std::vector;
    +
    +using tmb::AnnotatedMessage;
    +using tmb::MessageBus;
    +using tmb::TaggedMessage;
    +using tmb::client_id;
    +
    +namespace quickstep {
    +
    +namespace S = serialization;
    +
    +class QueryHandle;
    +
    +ForemanDistributed::ForemanDistributed(
    +    MessageBus *bus,
    +    CatalogDatabaseLite *catalog_database,
    +    const int cpu_id,
    +    const bool profile_individual_workorders)
    +    : ForemanBase(bus, cpu_id),
    +      catalog_database_(DCHECK_NOTNULL(catalog_database)) {
    +  const std::vector<QueryExecutionMessageType> sender_message_types{
    +      kShiftbossRegistrationResponseMessage,
    +      kQueryInitiateMessage,
    +      kWorkOrderMessage,
    +      kInitiateRebuildMessage,
    +      kQueryTeardownMessage,
    +      kSaveQueryResultMessage,
    +      kQueryExecutionSuccessMessage,
    +      kPoisonMessage};
    +
    +  for (const auto message_type : sender_message_types) {
    +    bus_->RegisterClientAsSender(foreman_client_id_, message_type);
    +  }
    +
    +  const std::vector<QueryExecutionMessageType> receiver_message_types{
    +      kShiftbossRegistrationMessage,
    +      kAdmitRequestMessage,
    +      kQueryInitiateResponseMessage,
    +      kCatalogRelationNewBlockMessage,
    +      kDataPipelineMessage,
    +      kInitiateRebuildResponseMessage,
    +      kWorkOrderCompleteMessage,
    +      kRebuildWorkOrderCompleteMessage,
    +      kWorkOrderFeedbackMessage,
    +      kSaveQueryResultResponseMessage,
    +      kPoisonMessage};
    +
    +  for (const auto message_type : receiver_message_types) {
    +    bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
    +  }
    +
    +  policy_enforcer_.reset(new PolicyEnforcerDistributed(
    +      foreman_client_id_,
    +      catalog_database_,
    +      &shiftboss_directory_,
    +      bus_,
    +      profile_individual_workorders));
    +}
    +
    +void ForemanDistributed::run() {
    +  if (cpu_id_ >= 0) {
    +    // We can pin the foreman thread to a CPU if specified.
    +    ThreadUtil::BindToCPU(cpu_id_);
    +  }
    +
    +  // Ensure that at least one Shiftboss to register.
    +  if (shiftboss_directory_.empty()) {
    +    const AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
    +    const TaggedMessage &tagged_message = annotated_message.tagged_message;
    +    DCHECK_EQ(kShiftbossRegistrationMessage, tagged_message.message_type());
    +    LOG(INFO) << "ForemanDistributed received typed '" << tagged_message.message_type()
    +              << "' message from client " << annotated_message.sender;
    +
    +    S::ShiftbossRegistrationMessage proto;
    +    CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
    +
    +    processShiftbossRegisterationMessage(annotated_message.sender, proto.work_order_capacity());
    +    DCHECK_EQ(1u, shiftboss_directory_.size());
    +  }
    +
    +  // Event loop
    +  for (;;) {
    +    // Receive() causes this thread to sleep until next message is received.
    +    const AnnotatedMessage annotated_message =
    +        bus_->Receive(foreman_client_id_, 0, true);
    +    const TaggedMessage &tagged_message = annotated_message.tagged_message;
    +    const tmb::message_type_id message_type = tagged_message.message_type();
    +    LOG(INFO) << "ForemanDistributed received typed '" << message_type
    --- End diff --
    
    Do you want to convert the LOG call to DLOG, considering that there will be a large number of messages received by the ForemanDistributed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep issue #93: Added ForemanDistributed.

Posted by hbdeshmukh <gi...@git.apache.org>.
Github user hbdeshmukh commented on the issue:

    https://github.com/apache/incubator-quickstep/pull/93
  
    Merged. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep issue #93: Added ForemanDistributed.

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on the issue:

    https://github.com/apache/incubator-quickstep/pull/93
  
    Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---