You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/08/09 17:45:19 UTC
[1/3] incubator-quickstep git commit: Deserialized Window Aggr
WorkOrder. [Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/msg-cleanup f484189fc -> 616899621 (forced update)
Deserialized Window Aggr WorkOrder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/85e02de4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/85e02de4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/85e02de4
Branch: refs/heads/msg-cleanup
Commit: 85e02de49205409accfef3737dadfe95aad1f5c0
Parents: 658cb61
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Aug 8 23:14:08 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Mon Aug 8 23:14:08 2016 -0700
----------------------------------------------------------------------
relational_operators/CMakeLists.txt | 1 +
relational_operators/WorkOrderFactory.cpp | 25 +++++++++++++++++++++++++
2 files changed, 26 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/85e02de4/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 9696392..43a42f9 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -467,6 +467,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
quickstep_relationaloperators_TableGeneratorOperator
quickstep_relationaloperators_TextScanOperator
quickstep_relationaloperators_UpdateOperator
+ quickstep_relationaloperators_WindowAggregationOperator
quickstep_relationaloperators_WorkOrder_proto
quickstep_storage_StorageBlockInfo
quickstep_utility_Macros
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/85e02de4/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index f920cac..721d735 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -46,6 +46,7 @@
#include "relational_operators/TableGeneratorOperator.hpp"
#include "relational_operators/TextScanOperator.hpp"
#include "relational_operators/UpdateOperator.hpp"
+#include "relational_operators/WindowAggregationOperator.hpp"
#include "relational_operators/WorkOrder.pb.h"
#include "storage/StorageBlockInfo.hpp"
@@ -419,6 +420,22 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
shiftboss_client_id,
bus);
}
+ case serialization::WINDOW_AGGREGATION: {
+ LOG(INFO) << "Creating WindowAggregationWorkOrder";
+ vector<block_id> blocks;
+ for (int i = 0; i < proto.ExtensionSize(serialization::WindowAggregationWorkOrder::block_ids); ++i) {
+ blocks.push_back(
+ proto.GetExtension(serialization::WindowAggregationWorkOrder::block_ids, i));
+ }
+
+ return new WindowAggregationWorkOrder(
+ proto.query_id(),
+ query_context->getWindowAggregationState(
+ proto.GetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index)),
+ move(blocks),
+ query_context->getInsertDestination(
+ proto.GetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index)));
+ }
default:
LOG(FATAL) << "Unknown WorkOrder Type in WorkOrderFactory::ReconstructFromProto";
}
@@ -697,6 +714,14 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
proto.HasExtension(serialization::UpdateWorkOrder::operator_index) &&
proto.HasExtension(serialization::UpdateWorkOrder::block_id);
}
+ case serialization::WINDOW_AGGREGATION: {
+ return proto.HasExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index) &&
+ query_context.isValidWindowAggregationStateId(
+ proto.GetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index)) &&
+ proto.HasExtension(serialization::WindowAggregationWorkOrder::insert_destination_index) &&
+ query_context.isValidInsertDestinationId(
+ proto.GetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index));
+ }
default:
return false;
}
[3/3] incubator-quickstep git commit: Removed an unused message type.
Posted by zu...@apache.org.
Removed an unused message type.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/61689962
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/61689962
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/61689962
Branch: refs/heads/msg-cleanup
Commit: 6168996216af8278d5c789c67aa4ec8325fab483
Parents: 2c0ce6a
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Aug 8 15:32:34 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Tue Aug 9 10:44:50 2016 -0700
----------------------------------------------------------------------
query_execution/ForemanSingleNode.cpp | 4 +---
query_execution/PolicyEnforcerBase.cpp | 13 -------------
query_execution/QueryExecutionMessages.proto | 5 -----
query_execution/QueryExecutionTypedefs.hpp | 1 -
query_execution/Shiftboss.cpp | 1 -
query_execution/Shiftboss.hpp | 1 -
query_execution/Worker.hpp | 1 -
7 files changed, 1 insertion(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/ForemanSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp
index 23db379..d064a6f 100644
--- a/query_execution/ForemanSingleNode.cpp
+++ b/query_execution/ForemanSingleNode.cpp
@@ -87,7 +87,6 @@ ForemanSingleNode::ForemanSingleNode(
kPoisonMessage,
kRebuildWorkOrderCompleteMessage,
kWorkOrderFeedbackMessage,
- kWorkOrdersAvailableMessage,
kWorkOrderCompleteMessage};
for (const auto message_type : receiver_message_types) {
@@ -122,8 +121,7 @@ void ForemanSingleNode::run() {
case kDataPipelineMessage:
case kRebuildWorkOrderCompleteMessage:
case kWorkOrderCompleteMessage:
- case kWorkOrderFeedbackMessage:
- case kWorkOrdersAvailableMessage: {
+ case kWorkOrderFeedbackMessage: {
policy_enforcer_->processMessage(tagged_message);
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index 78f7b44..4174bd6 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -107,19 +107,6 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
op_index, proto.block_id(), proto.relation_id());
break;
}
- case kWorkOrdersAvailableMessage: {
- serialization::WorkOrdersAvailableMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(),
- tagged_message.message_bytes()));
- query_id = proto.query_id();
- DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
-
- op_index = proto.operator_index();
-
- // Check if new work orders are available.
- admitted_queries_[query_id]->fetchNormalWorkOrders(op_index);
- break;
- }
case kWorkOrderFeedbackMessage: {
WorkOrder::FeedbackMessage msg(
const_cast<void *>(tagged_message.message()),
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 20b684e..060efa1 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -74,11 +74,6 @@ message DataPipelineMessage {
required uint64 query_id = 4;
}
-message WorkOrdersAvailableMessage {
- required uint64 operator_index = 1;
- required uint64 query_id = 2;
-}
-
// Distributed version related messages.
message ShiftbossRegistrationMessage {
// The total Work Order processing capacity in Shiftboss, which equals to the
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index d154d84..33a93b0 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -69,7 +69,6 @@ enum QueryExecutionMessageType : message_type_id {
kWorkOrderCompleteMessage, // From Worker to Foreman.
kCatalogRelationNewBlockMessage, // From InsertDestination to Foreman.
kDataPipelineMessage, // From InsertDestination or some WorkOrders to Foreman.
- kWorkOrdersAvailableMessage, // From some WorkOrders to Foreman.
kWorkOrderFeedbackMessage, // From some WorkOrders to Foreman on behalf of
// their corresponding RelationalOperators.
kRebuildWorkOrderMessage, // From Foreman to Worker.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 24c91fe..bd83dd4 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -152,7 +152,6 @@ void Shiftboss::run() {
case kWorkOrderCompleteMessage: // Fall through.
case kRebuildWorkOrderCompleteMessage:
case kDataPipelineMessage:
- case kWorkOrdersAvailableMessage:
case kWorkOrderFeedbackMessage: {
LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
<< "') forwarded typed '" << annotated_message.tagged_message.message_type()
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/Shiftboss.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp
index 9464a4d..30a8d1a 100644
--- a/query_execution/Shiftboss.hpp
+++ b/query_execution/Shiftboss.hpp
@@ -103,7 +103,6 @@ class Shiftboss : public Thread {
// Message sent to Foreman.
bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
- bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrdersAvailableMessage);
bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);
// Forward the following message types from Foreman to Workers.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/61689962/query_execution/Worker.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.hpp b/query_execution/Worker.hpp
index 44a7447..aa39bb3 100644
--- a/query_execution/Worker.hpp
+++ b/query_execution/Worker.hpp
@@ -68,7 +68,6 @@ class Worker : public Thread {
kRebuildWorkOrderCompleteMessage);
bus_->RegisterClientAsSender(worker_client_id_, kCatalogRelationNewBlockMessage);
bus_->RegisterClientAsSender(worker_client_id_, kDataPipelineMessage);
- bus_->RegisterClientAsSender(worker_client_id_, kWorkOrdersAvailableMessage);
bus_->RegisterClientAsSender(worker_client_id_, kWorkOrderFeedbackMessage);
bus_->RegisterClientAsReceiver(worker_client_id_, kWorkOrderMessage);
[2/3] incubator-quickstep git commit: Fixed bugs in creating
WorkOrderProtos.
Posted by zu...@apache.org.
Fixed bugs in creating WorkOrderProtos.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2c0ce6a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2c0ce6a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2c0ce6a3
Branch: refs/heads/msg-cleanup
Commit: 2c0ce6a3bcf2ec40b0c32d077552cda3e225f787
Parents: 85e02de
Author: Zuyu Zhang <zu...@twitter.com>
Authored: Mon Aug 8 18:42:32 2016 -0700
Committer: Zuyu Zhang <zu...@twitter.com>
Committed: Tue Aug 9 10:42:25 2016 -0700
----------------------------------------------------------------------
relational_operators/HashJoinOperator.cpp | 1 +
relational_operators/WorkOrderFactory.cpp | 4 +---
2 files changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2c0ce6a3/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 7851f41..779c0fe 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -393,6 +393,7 @@ bool HashJoinOperator::getAllOuterJoinWorkOrderProtos(WorkOrderProtosContainer *
serialization::WorkOrder* HashJoinOperator::createOuterJoinWorkOrderProto(const block_id block) {
serialization::WorkOrder *proto = new serialization::WorkOrder;
proto->set_work_order_type(serialization::HASH_JOIN);
+ proto->set_query_id(query_id_);
proto->SetExtension(serialization::HashJoinWorkOrder::hash_join_work_order_type,
serialization::HashJoinWorkOrder::HASH_OUTER_JOIN);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2c0ce6a3/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 721d735..7d7af59 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -533,13 +533,11 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
return false;
}
- const CatalogRelationSchema &build_relation = catalog_database.getRelationSchemaById(build_relation_id);
const CatalogRelationSchema &probe_relation = catalog_database.getRelationSchemaById(probe_relation_id);
for (int i = 0; i < proto.ExtensionSize(serialization::HashJoinWorkOrder::join_key_attributes); ++i) {
const attribute_id attr_id =
proto.GetExtension(serialization::HashJoinWorkOrder::join_key_attributes, i);
- if (!build_relation.hasAttributeWithId(attr_id) ||
- !probe_relation.hasAttributeWithId(attr_id)) {
+ if (!probe_relation.hasAttributeWithId(attr_id)) {
return false;
}
}