You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/08/09 17:45:19 UTC

[1/3] incubator-quickstep git commit: Deserialized Window Aggr WorkOrder. [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/msg-cleanup f484189fc -> 616899621 (forced update)


Deserialized Window Aggr WorkOrder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/85e02de4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/85e02de4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/85e02de4

Branch: refs/heads/msg-cleanup
Commit: 85e02de49205409accfef3737dadfe95aad1f5c0
Parents: 658cb61
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Aug 8 23:14:08 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Mon Aug 8 23:14:08 2016 -0700

----------------------------------------------------------------------
 relational_operators/CMakeLists.txt       |  1 +
 relational_operators/WorkOrderFactory.cpp | 25 +++++++++++++++++++++++++
 2 files changed, 26 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/85e02de4/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 9696392..43a42f9 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -467,6 +467,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
                       quickstep_relationaloperators_TableGeneratorOperator
                       quickstep_relationaloperators_TextScanOperator
                       quickstep_relationaloperators_UpdateOperator
+                      quickstep_relationaloperators_WindowAggregationOperator
                       quickstep_relationaloperators_WorkOrder_proto
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_Macros

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/85e02de4/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index f920cac..721d735 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -46,6 +46,7 @@
 #include "relational_operators/TableGeneratorOperator.hpp"
 #include "relational_operators/TextScanOperator.hpp"
 #include "relational_operators/UpdateOperator.hpp"
+#include "relational_operators/WindowAggregationOperator.hpp"
 #include "relational_operators/WorkOrder.pb.h"
 #include "storage/StorageBlockInfo.hpp"
 
@@ -419,6 +420,22 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           shiftboss_client_id,
           bus);
     }
+    case serialization::WINDOW_AGGREGATION: {
+      LOG(INFO) << "Creating WindowAggregationWorkOrder";
+      vector<block_id> blocks;
+      for (int i = 0; i < proto.ExtensionSize(serialization::WindowAggregationWorkOrder::block_ids); ++i) {
+        blocks.push_back(
+            proto.GetExtension(serialization::WindowAggregationWorkOrder::block_ids, i));
+      }
+
+      return new WindowAggregationWorkOrder(
+          proto.query_id(),
+          query_context->getWindowAggregationState(
+              proto.GetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index)),
+          move(blocks),
+          query_context->getInsertDestination(
+              proto.GetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index)));
+    }
     default:
       LOG(FATAL) << "Unknown WorkOrder Type in WorkOrderFactory::ReconstructFromProto";
   }
@@ -697,6 +714,14 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
              proto.HasExtension(serialization::UpdateWorkOrder::operator_index) &&
              proto.HasExtension(serialization::UpdateWorkOrder::block_id);
     }
+    case serialization::WINDOW_AGGREGATION: {
+      return proto.HasExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index) &&
+             query_context.isValidWindowAggregationStateId(
+                 proto.GetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index)) &&
+             proto.HasExtension(serialization::WindowAggregationWorkOrder::insert_destination_index) &&
+             query_context.isValidInsertDestinationId(
+                 proto.GetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index));
+    }
     default:
       return false;
   }


[3/3] incubator-quickstep git commit: Removed an unused message type.

Posted by zu...@apache.org.
Removed an unused message type.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/61689962
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/61689962
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/61689962

Branch: refs/heads/msg-cleanup
Commit: 6168996216af8278d5c789c67aa4ec8325fab483
Parents: 2c0ce6a
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Aug 8 15:32:34 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Tue Aug 9 10:44:50 2016 -0700

----------------------------------------------------------------------
 query_execution/ForemanSingleNode.cpp        |  4 +---
 query_execution/PolicyEnforcerBase.cpp       | 13 -------------
 query_execution/QueryExecutionMessages.proto |  5 -----
 query_execution/QueryExecutionTypedefs.hpp   |  1 -
 query_execution/Shiftboss.cpp                |  1 -
 query_execution/Shiftboss.hpp                |  1 -
 query_execution/Worker.hpp                   |  1 -
 7 files changed, 1 insertion(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
index 23db379..d064a6f 100644
--- a/query_execution/ForemanSingleNode.cpp
+++ b/query_execution/ForemanSingleNode.cpp
@@ -87,7 +87,6 @@ ForemanSingleNode::ForemanSingleNode(
       kPoisonMessage,
       kRebuildWorkOrderCompleteMessage,
       kWorkOrderFeedbackMessage,
-      kWorkOrdersAvailableMessage,
       kWorkOrderCompleteMessage};
 
   for (const auto message_type : receiver_message_types) {
@@ -122,8 +121,7 @@ void ForemanSingleNode::run() {
       case kDataPipelineMessage:
       case kRebuildWorkOrderCompleteMessage:
       case kWorkOrderCompleteMessage:
-      case kWorkOrderFeedbackMessage:
-      case kWorkOrdersAvailableMessage: {
+      case kWorkOrderFeedbackMessage: {
         policy_enforcer_->processMessage(tagged_message);
         break;
       }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index 78f7b44..4174bd6 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -107,19 +107,6 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
           op_index, proto.block_id(), proto.relation_id());
       break;
     }
-    case kWorkOrdersAvailableMessage: {
-      serialization::WorkOrdersAvailableMessage proto;
-      CHECK(proto.ParseFromArray(tagged_message.message(),
-                                 tagged_message.message_bytes()));
-      query_id = proto.query_id();
-      DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
-      op_index = proto.operator_index();
-
-      // Check if new work orders are available.
-      admitted_queries_[query_id]->fetchNormalWorkOrders(op_index);
-      break;
-    }
     case kWorkOrderFeedbackMessage: {
       WorkOrder::FeedbackMessage msg(
           const_cast<void *>(tagged_message.message()),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 20b684e..060efa1 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -74,11 +74,6 @@ message DataPipelineMessage {
   required uint64 query_id = 4;
 }
 
-message WorkOrdersAvailableMessage {
-  required uint64 operator_index = 1;
-  required uint64 query_id = 2;
-}
-
 // Distributed version related messages.
 message ShiftbossRegistrationMessage {
   // The total Work Order processing capacity in Shiftboss, which equals to the

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index d154d84..33a93b0 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -69,7 +69,6 @@ enum QueryExecutionMessageType : message_type_id {
   kWorkOrderCompleteMessage,  // From Worker to Foreman.
   kCatalogRelationNewBlockMessage,  // From InsertDestination to Foreman.
   kDataPipelineMessage,  // From InsertDestination or some WorkOrders to Foreman.
-  kWorkOrdersAvailableMessage,  // From some WorkOrders to Foreman.
   kWorkOrderFeedbackMessage,  // From some WorkOrders to Foreman on behalf of
                               // their corresponding RelationalOperators.
   kRebuildWorkOrderMessage,  // From Foreman to Worker.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 24c91fe..bd83dd4 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -152,7 +152,6 @@ void Shiftboss::run() {
       case kWorkOrderCompleteMessage:  // Fall through.
       case kRebuildWorkOrderCompleteMessage:
       case kDataPipelineMessage:
-      case kWorkOrdersAvailableMessage:
       case kWorkOrderFeedbackMessage: {
         LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
                   << "') forwarded typed '" << annotated_message.tagged_message.message_type()

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 9464a4d..30a8d1a 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -103,7 +103,6 @@ class Shiftboss : public Thread {
     // Message sent to Foreman.
     bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
     bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
-    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrdersAvailableMessage);
     bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
 
     // Forward the following message types from Foreman to Workers.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/Worker.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.hpp b/query_execution/Worker.hpp
index 44a7447..aa39bb3 100644
--- a/query_execution/Worker.hpp
+++ b/query_execution/Worker.hpp
@@ -68,7 +68,6 @@ class Worker : public Thread {
                                  kRebuildWorkOrderCompleteMessage);
     bus_->RegisterClientAsSender(worker_client_id_, kCatalogRelationNewBlockMessage);
     bus_->RegisterClientAsSender(worker_client_id_, kDataPipelineMessage);
-    bus_->RegisterClientAsSender(worker_client_id_, kWorkOrdersAvailableMessage);
     bus_->RegisterClientAsSender(worker_client_id_, kWorkOrderFeedbackMessage);
 
     bus_->RegisterClientAsReceiver(worker_client_id_, kWorkOrderMessage);


[2/3] incubator-quickstep git commit: Fixed bugs in creating WorkOrderProtos.

Posted by zu...@apache.org.
Fixed bugs in creating WorkOrderProtos.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2c0ce6a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2c0ce6a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2c0ce6a3

Branch: refs/heads/msg-cleanup
Commit: 2c0ce6a3bcf2ec40b0c32d077552cda3e225f787
Parents: 85e02de
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Aug 8 18:42:32 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Tue Aug 9 10:42:25 2016 -0700

----------------------------------------------------------------------
 relational_operators/HashJoinOperator.cpp | 1 +
 relational_operators/WorkOrderFactory.cpp | 4 +---
 2 files changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2c0ce6a3/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 7851f41..779c0fe 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -393,6 +393,7 @@ bool HashJoinOperator::getAllOuterJoinWorkOrderProtos(WorkOrderProtosContainer *
 serialization::WorkOrder* HashJoinOperator::createOuterJoinWorkOrderProto(const block_id block) {
   serialization::WorkOrder *proto = new serialization::WorkOrder;
   proto->set_work_order_type(serialization::HASH_JOIN);
+  proto->set_query_id(query_id_);
 
   proto->SetExtension(serialization::HashJoinWorkOrder::hash_join_work_order_type,
                       serialization::HashJoinWorkOrder::HASH_OUTER_JOIN);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2c0ce6a3/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 721d735..7d7af59 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -533,13 +533,11 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
         return false;
       }
 
-      const CatalogRelationSchema &build_relation = catalog_database.getRelationSchemaById(build_relation_id);
       const CatalogRelationSchema &probe_relation = catalog_database.getRelationSchemaById(probe_relation_id);
       for (int i = 0; i < proto.ExtensionSize(serialization::HashJoinWorkOrder::join_key_attributes); ++i) {
         const attribute_id attr_id =
             proto.GetExtension(serialization::HashJoinWorkOrder::join_key_attributes, i);
-        if (!build_relation.hasAttributeWithId(attr_id) ||
-            !probe_relation.hasAttributeWithId(attr_id)) {
+        if (!probe_relation.hasAttributeWithId(attr_id)) {
           return false;
         }
       }