You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/06/08 13:21:50 UTC

[50/50] [abbrv] incubator-quickstep git commit: Merge branch 'master' into query-manager-used-in-foreman

Merge branch 'master' into query-manager-used-in-foreman

Conflicts:
	cli/QuickstepCli.cpp
	query_execution/CMakeLists.txt
	query_execution/QueryContext.proto
	query_execution/QueryExecutionTypedefs.hpp
	query_optimizer/ExecutionGenerator.hpp
	storage/StorageManager.hpp


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/bef0ae1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/bef0ae1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/bef0ae1d

Branch: refs/heads/query-manager-used-in-foreman
Commit: bef0ae1d83649b1d1a288f94e4be6bca28ea6037
Parents: 24c93ca 69c6551
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Wed Jun 8 08:15:49 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Wed Jun 8 08:15:49 2016 -0500

----------------------------------------------------------------------
 CMakeLists.txt                                  |   17 +-
 DISCLAIMER                                      |   11 +
 README.md                                       |   25 +-
 catalog/CMakeLists.txt                          |   16 +-
 catalog/Catalog.proto                           |   13 +-
 catalog/CatalogRelation.cpp                     |   11 +
 catalog/CatalogRelation.hpp                     |   52 +-
 catalog/CatalogRelationStatistics.cpp           |   49 +
 catalog/CatalogRelationStatistics.hpp           |  122 +
 cli/CMakeLists.txt                              |   23 +-
 cli/CommandExecutor.cpp                         |  189 +-
 cli/CommandExecutor.hpp                         |   28 +-
 cli/DefaultsConfigurator.hpp                    |   16 +
 cli/InputParserUtil.cpp                         |    1 +
 cli/PrintToScreen.cpp                           |   33 +
 cli/PrintToScreen.hpp                           |   31 +
 cli/QuickstepCli.cpp                            |   57 +-
 cli/tests/CommandExecutorTestRunner.cpp         |    5 +
 cli/tests/command_executor/D.test               |   35 +-
 cli/tests/command_executor/Dt.test              |   36 +-
 .../aggregation/AggregationConcreteHandle.hpp   |  105 +
 expressions/aggregation/AggregationHandle.hpp   |   15 +-
 .../aggregation/AggregationHandleAvg.cpp        |    9 +
 .../aggregation/AggregationHandleAvg.hpp        |    4 +
 .../aggregation/AggregationHandleCount.cpp      |   11 +
 .../aggregation/AggregationHandleCount.hpp      |    4 +
 .../aggregation/AggregationHandleDistinct.hpp   |    7 +
 .../aggregation/AggregationHandleMax.cpp        |    9 +
 .../aggregation/AggregationHandleMax.hpp        |    4 +
 .../aggregation/AggregationHandleMin.cpp        |    9 +
 .../aggregation/AggregationHandleMin.hpp        |    4 +
 .../aggregation/AggregationHandleSum.cpp        |   11 +-
 .../aggregation/AggregationHandleSum.hpp        |    4 +
 expressions/aggregation/CMakeLists.txt          |    2 +
 .../tests/AggregationHandleAvg_unittest.cpp     |  109 +
 .../tests/AggregationHandleCount_unittest.cpp   |  126 +-
 .../tests/AggregationHandleMax_unittest.cpp     |  122 +
 .../tests/AggregationHandleMin_unittest.cpp     |  121 +
 .../tests/AggregationHandleSum_unittest.cpp     |  124 +
 parser/ParseBasicExpressions.cpp                |   31 +
 parser/ParseBasicExpressions.hpp                |   77 +
 parser/ParseExpression.hpp                      |    1 +
 parser/ParseIndexProperties.hpp                 |    2 +-
 parser/SqlLexer.lpp                             |    2 +
 parser/SqlParser.ypp                            |   16 +
 parser/preprocessed/SqlLexer_gen.cpp            | 1173 +++----
 parser/preprocessed/SqlLexer_gen.hpp            |    2 +-
 parser/preprocessed/SqlParser_gen.cpp           | 2920 +++++++++---------
 parser/preprocessed/SqlParser_gen.hpp           |  126 +-
 parser/tests/Index.test                         |   11 +
 query_execution/AdmitRequestMessage.hpp         |    2 +
 query_execution/BlockLocator.cpp                |  223 ++
 query_execution/BlockLocator.hpp                |  125 +
 query_execution/CMakeLists.txt                  |   51 +
 query_execution/Foreman.cpp                     |    3 +
 query_execution/PolicyEnforcer.cpp              |    9 +-
 query_execution/QueryContext.cpp                |   16 +-
 query_execution/QueryContext.hpp                |   57 +-
 query_execution/QueryContext.proto              |   20 +-
 query_execution/QueryExecutionMessages.proto    |   34 +
 query_execution/QueryExecutionTypedefs.hpp      |   20 +-
 query_execution/QueryExecutionUtil.hpp          |   45 +-
 query_execution/tests/BlockLocator_unittest.cpp |  270 ++
 query_optimizer/CMakeLists.txt                  |   16 +-
 query_optimizer/ExecutionGenerator.cpp          |  139 +-
 query_optimizer/ExecutionGenerator.hpp          |   15 +-
 query_optimizer/ExecutionHeuristics.cpp         |  127 +
 query_optimizer/ExecutionHeuristics.hpp         |  155 +
 query_optimizer/PhysicalGenerator.cpp           |   25 +
 query_optimizer/QueryProcessor.hpp              |   10 +
 query_optimizer/cost_model/CMakeLists.txt       |   33 +-
 query_optimizer/cost_model/SimpleCostModel.cpp  |    9 +
 query_optimizer/cost_model/SimpleCostModel.hpp  |    8 +-
 .../cost_model/StarSchemaSimpleCostModel.cpp    |  258 ++
 .../cost_model/StarSchemaSimpleCostModel.hpp    |  115 +
 .../logical/SharedSubplanReference.cpp          |    7 +-
 .../logical/SharedSubplanReference.hpp          |   34 +-
 query_optimizer/physical/Aggregate.cpp          |   14 +
 .../physical/SharedSubplanReference.cpp         |    5 +
 .../physical/SharedSubplanReference.hpp         |   34 +-
 query_optimizer/resolver/CMakeLists.txt         |    1 +
 query_optimizer/resolver/Resolver.cpp           |   84 +-
 query_optimizer/rules/CMakeLists.txt            |   20 +-
 query_optimizer/rules/RuleHelper.cpp            |   18 +-
 .../StarSchemaHashJoinOrderOptimization.cpp     |  309 ++
 .../StarSchemaHashJoinOrderOptimization.hpp     |  136 +
 query_optimizer/strategy/OneToOne.cpp           |    1 +
 query_optimizer/tests/CMakeLists.txt            |   17 +
 .../tests/ExecutionGeneratorTestRunner.cpp      |   17 +-
 .../tests/ExecutionHeuristics_unittest.cpp      |  301 ++
 query_optimizer/tests/OptimizerTextTest.cpp     |   14 +
 .../tests/execution_generator/CMakeLists.txt    |    2 +-
 .../tests/execution_generator/Select.test       |  101 +-
 .../tests/logical_generator/Create.test         |    4 +-
 .../tests/logical_generator/Select.test         |  106 +-
 .../tests/physical_generator/Create.test        |    6 +-
 .../tests/physical_generator/Select.test        |  181 +-
 query_optimizer/tests/resolver/Create.test      |   28 +-
 query_optimizer/tests/resolver/Select.test      |  259 +-
 relational_operators/CMakeLists.txt             |    8 +
 relational_operators/HashJoinOperator.cpp       |    7 +-
 relational_operators/SelectOperator.cpp         |  129 +-
 relational_operators/SelectOperator.hpp         |  115 +-
 relational_operators/WorkOrder.proto            |   98 +-
 relational_operators/WorkOrderFactory.cpp       |  362 +--
 storage/AggregationOperationState.cpp           |  112 +-
 storage/AggregationOperationState.hpp           |    9 +
 storage/AggregationOperationState.proto         |    5 +
 storage/CMakeLists.txt                          |  116 +-
 storage/DataExchange.proto                      |   31 +
 storage/DataExchangerAsync.cpp                  |  165 +
 storage/DataExchangerAsync.hpp                  |   97 +
 storage/EvictionPolicy.hpp                      |    6 +
 storage/FileManager.hpp                         |    9 +-
 storage/FileManagerHdfs.cpp                     |   59 +-
 storage/FileManagerPosix.cpp                    |   55 +-
 storage/FileManagerPosix.hpp                    |    6 +-
 storage/FileManagerWindows.cpp                  |   55 +-
 storage/FileManagerWindows.hpp                  |    4 +-
 storage/HashTable.hpp                           |  104 +
 storage/HashTable.proto                         |    8 +
 storage/HashTableFactory.hpp                    |   44 +-
 storage/HashTablePool.hpp                       |  166 +
 storage/PreloaderThread.cpp                     |   98 +-
 storage/PreloaderThread.hpp                     |   30 +
 storage/SMAIndexSubBlock.cpp                    |   48 +-
 storage/SMAIndexSubBlock.hpp                    |    3 +-
 .../SimpleScalarSeparateChainingHashTable.hpp   |   10 +-
 storage/StorageBlock.cpp                        |    5 +
 storage/StorageBlock.hpp                        |   11 +-
 storage/StorageConstants.hpp                    |   15 +-
 storage/StorageManager.cpp                      |  473 ++-
 storage/StorageManager.hpp                      |  182 +-
 storage/tests/DataExchange_unittest.cpp         |  240 ++
 storage/tests/SMAIndexSubBlock_unittest.cpp     |   52 +
 storage/tests/StorageManager_unittest.cpp       |   42 +
 third_party/iwyu/iwyu_helper.py                 |    8 +-
 transaction/AccessMode.hpp                      |   76 +-
 transaction/CMakeLists.txt                      |   59 +-
 transaction/CycleDetector.cpp                   |  120 +
 transaction/CycleDetector.hpp                   |   83 +
 transaction/DeadLockDetector.cpp                |  177 ++
 transaction/DeadLockDetector.hpp                |  156 +
 transaction/DirectedGraph.hpp                   |   56 +-
 transaction/LockManager.cpp                     |  237 ++
 transaction/LockManager.hpp                     |  128 +
 transaction/LockTable.cpp                       |   22 +-
 transaction/LockTable.hpp                       |   36 +-
 transaction/ResourceId.hpp                      |    9 +-
 transaction/StronglyConnectedComponents.cpp     |    1 -
 transaction/Transaction.cpp                     |   48 -
 transaction/TransactionTable.cpp                |    8 +-
 transaction/TransactionTable.hpp                |   11 +-
 transaction/tests/AccessMode_unittest.cpp       |   12 +-
 transaction/tests/CycleDetector_unittest.cpp    |  157 +
 transaction/tests/DeadLockDetector_unittest.cpp |   96 +
 transaction/tests/DirectedGraph_unittest.cpp    |  113 +-
 transaction/tests/LockRequest_unittest.cpp      |    4 +-
 transaction/tests/LockTable_unittest.cpp        |   63 +-
 transaction/tests/Lock_unittest.cpp             |   13 +-
 .../StronglyConnectedComponents_unittest.cpp    |   15 +-
 transaction/tests/TransactionTable_unittest.cpp |  102 +-
 types/operations/Operation.proto                |    9 +
 .../operations/unary_operations/CMakeLists.txt  |   22 +
 .../unary_operations/SubstringOperation.cpp     |  214 ++
 .../unary_operations/SubstringOperation.hpp     |  234 ++
 .../unary_operations/UnaryOperation.cpp         |    4 +
 .../unary_operations/UnaryOperationFactory.cpp  |   12 +
 .../unary_operations/UnaryOperationID.cpp       |    6 +-
 .../unary_operations/UnaryOperationID.hpp       |    3 +
 utility/BloomFilter.hpp                         |  198 +-
 utility/BloomFilter.proto                       |   30 +
 utility/CMakeLists.txt                          |   31 +
 utility/PlanVisualizer.cpp                      |  161 +
 utility/PlanVisualizer.hpp                      |   94 +
 utility/ScopedBuffer.hpp                        |   19 +-
 utility/ShardedLockManager.hpp                  |   71 +-
 utility/StringUtil.cpp                          |   14 +
 utility/StringUtil.hpp                          |   48 +-
 validate_cmakelists.py                          |    5 +-
 180 files changed, 12093 insertions(+), 3286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/CMakeLists.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/cli/CommandExecutor.cpp
----------------------------------------------------------------------
diff --cc cli/CommandExecutor.cpp
index f38121f,ddcd38f..cfb8c5b
--- a/cli/CommandExecutor.cpp
+++ b/cli/CommandExecutor.cpp
@@@ -28,15 -29,28 +29,30 @@@
  #include "catalog/CatalogDatabase.hpp"
  #include "catalog/CatalogRelation.hpp"
  #include "catalog/CatalogRelationSchema.hpp"
+ #include "cli/DropRelation.hpp"
  #include "cli/PrintToScreen.hpp"
  #include "parser/ParseStatement.hpp"
+ #include "parser/ParseString.hpp"
+ #include "parser/SqlParserWrapper.hpp"
+ #include "query_execution/Foreman.hpp"
+ #include "query_optimizer/QueryHandle.hpp"
+ #include "query_optimizer/QueryPlan.hpp"
+ #include "query_optimizer/QueryProcessor.hpp"
+ #include "storage/StorageBlock.hpp"
+ #include "storage/StorageBlockInfo.hpp"
+ #include "storage/StorageManager.hpp"
+ #include "storage/TupleIdSequence.hpp"
+ #include "storage/TupleStorageSubBlock.hpp"
+ #include "types/Type.hpp"
+ #include "types/TypeID.hpp"
+ #include "types/TypedValue.hpp"
  #include "utility/PtrVector.hpp"
- #include "utility/Macros.hpp"
  #include "utility/SqlError.hpp"
  
- #include "gflags/gflags.h"
  #include "glog/logging.h"
  
++#include "tmb/id_typedefs.h"
++
  using std::fprintf;
  using std::fputc;
  using std::fputs;
@@@ -44,6 -58,6 +60,8 @@@ using std::size_t
  using std::string;
  using std::vector;
  
++namespace tmb { class MessageBus; }
++
  namespace quickstep {
  namespace cli {
  namespace {
@@@ -162,10 -191,130 +195,143 @@@ void executeDescribeTable
    }
  }
  
+ /**
+  * @brief A helper function that executes a SQL query to obtain a scalar result.
+  */
 -inline TypedValue executeQueryForSingleResult(const std::string &query_string,
 -                                               StorageManager *storage_manager,
 -                                               QueryProcessor *query_processor,
 -                                               SqlParserWrapper *parser_wrapper,
 -                                               Foreman *foreman) {
++inline TypedValue executeQueryForSingleResult(
++    const tmb::client_id main_thread_client_id,
++    const tmb::client_id foreman_client_id,
++    const std::string &query_string,
++    tmb::MessageBus *bus,
++    StorageManager *storage_manager,
++    QueryProcessor *query_processor,
++    SqlParserWrapper *parser_wrapper) {
+   parser_wrapper->feedNextBuffer(new std::string(query_string));
+ 
+   ParseResult result = parser_wrapper->getNextStatement();
+   DCHECK(result.condition == ParseResult::kSuccess);
+ 
+   // Generate the query plan.
+   std::unique_ptr<QueryHandle> query_handle(
+       query_processor->generateQueryHandle(*result.parsed_statement));
+   DCHECK(query_handle->getQueryPlanMutable() != nullptr);
+ 
+   // Use foreman to execute the query plan.
 -  foreman->setQueryPlan(query_handle->getQueryPlanMutable()->getQueryPlanDAGMutable());
++  /*foreman->setQueryPlan(query_handle->getQueryPlanMutable()->getQueryPlanDAGMutable());
+   foreman->reconstructQueryContextFromProto(query_handle->getQueryContextProto());
+ 
+   foreman->start();
 -  foreman->join();
++  foreman->join();*/
++  QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
++      main_thread_client_id, foreman_client_id, query_handle.get(), bus);
+ 
+   // Retrieve the scalar result from the result relation.
+   const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();
+   DCHECK(query_result_relation != nullptr);
+ 
+   TypedValue value;
+   {
+     std::vector<block_id> blocks = query_result_relation->getBlocksSnapshot();
+     DCHECK_EQ(1u, blocks.size());
+     BlockReference block = storage_manager->getBlock(blocks[0], *query_result_relation);
+     const TupleStorageSubBlock &tuple_store = block->getTupleStorageSubBlock();
+     DCHECK_EQ(1, tuple_store.numTuples());
+     DCHECK_EQ(1u, tuple_store.getRelation().size());
+ 
+     if (tuple_store.isPacked()) {
+       value = tuple_store.getAttributeValueTyped(0, 0);
+     } else {
+       std::unique_ptr<TupleIdSequence> existence_map(tuple_store.getExistenceMap());
+       value = tuple_store.getAttributeValueTyped(*existence_map->begin(), 0);
+     }
+     value.ensureNotReference();
+   }
+ 
+   // Drop the result relation.
+   DropRelation::Drop(*query_result_relation,
+                      query_processor->getDefaultDatabase(),
+                      query_processor->getStorageManager());
+ 
+   return value;
+ }
+ 
 -void executeAnalyze(QueryProcessor *query_processor,
 -                    Foreman *foreman,
++void executeAnalyze(const tmb::client_id main_thread_client_id,
++                    const tmb::client_id foreman_client_id,
++                    MessageBus *bus,
++                    QueryProcessor *query_processor,
+                     FILE *out) {
+   const CatalogDatabase &database = *query_processor->getDefaultDatabase();
+   StorageManager *storage_manager = query_processor->getStorageManager();
+ 
+   std::unique_ptr<SqlParserWrapper> parser_wrapper(new SqlParserWrapper());
+   std::vector<std::reference_wrapper<const CatalogRelation>> relations(
+       database.begin(), database.end());
+ 
+   // Analyze each relation in the database.
+   for (const CatalogRelation &relation : relations) {
+     fprintf(out, "Analyzing %s ... ", relation.getName().c_str());
+     fflush(out);
+ 
+     CatalogRelation *mutable_relation =
+         query_processor->getDefaultDatabase()->getRelationByIdMutable(relation.getID());
+ 
+     // Get the number of distinct values for each column.
+     for (const CatalogAttribute &attribute : relation) {
+       std::string query_string = "SELECT COUNT(DISTINCT ";
+       query_string.append(attribute.getName());
+       query_string.append(") FROM ");
+       query_string.append(relation.getName());
+       query_string.append(";");
+ 
+       TypedValue num_distinct_values =
 -          executeQueryForSingleResult(query_string,
++          executeQueryForSingleResult(main_thread_client_id,
++                                      foreman_client_id,
++                                      query_string,
++                                      bus,
+                                       storage_manager,
+                                       query_processor,
 -                                      parser_wrapper.get(),
 -                                      foreman);
++                                      parser_wrapper.get());
+ 
+       DCHECK(num_distinct_values.getTypeID() == TypeID::kLong);
+       mutable_relation->getStatisticsMutable()->setNumDistinctValues(
+           attribute.getID(),
+           num_distinct_values.getLiteral<std::int64_t>());
+     }
+ 
+     // Get the number of tuples for the relation.
+     std::string query_string = "SELECT COUNT(*) FROM ";
+     query_string.append(relation.getName());
+     query_string.append(";");
+ 
+     TypedValue num_tuples =
 -        executeQueryForSingleResult(query_string,
++        executeQueryForSingleResult(main_thread_client_id,
++                                    foreman_client_id,
++                                    query_string,
++                                    bus,
+                                     storage_manager,
+                                     query_processor,
 -                                    parser_wrapper.get(),
 -                                    foreman);
++                                    parser_wrapper.get());
+ 
+     DCHECK(num_tuples.getTypeID() == TypeID::kLong);
+     mutable_relation->getStatisticsMutable()->setNumTuples(
+         num_tuples.getLiteral<std::int64_t>());
+ 
+     fprintf(out, "done\n");
+     fflush(out);
+   }
+   query_processor->markCatalogAltered();
+   query_processor->saveCatalog();
+ }
+ 
  }  // namespace
  
  void executeCommand(const ParseStatement &statement,
                      const CatalogDatabase &catalog_database,
++                    const tmb::client_id main_thread_client_id,
++                    const tmb::client_id foreman_client_id,
++                    MessageBus *bus,
+                     StorageManager *storage_manager,
+                     QueryProcessor *query_processor,
 -                    Foreman *foreman,
                      FILE *out) {
    const ParseCommand &command = static_cast<const ParseCommand &>(statement);
    const PtrVector<ParseString> *arguments = command.arguments();
@@@ -178,6 -327,8 +344,9 @@@
      } else {
        executeDescribeTable(arguments, catalog_database, out);
      }
+   } else if (command_str == C::kAnalyzeCommand) {
 -    executeAnalyze(query_processor, foreman, out);
++    executeAnalyze(
++        main_thread_client_id, foreman_client_id, bus, query_processor, out);
    } else {
      THROW_SQL_ERROR_AT(command.command()) << "Invalid Command";
    }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/cli/CommandExecutor.hpp
----------------------------------------------------------------------
diff --cc cli/CommandExecutor.hpp
index 21eee6a,c819981..3435aeb
--- a/cli/CommandExecutor.hpp
+++ b/cli/CommandExecutor.hpp
@@@ -21,13 -21,10 +21,14 @@@
  #include <cstdio>
  #include <string>
  
- #include "parser/ParseStatement.hpp"
- #include "utility/Macros.hpp"
++#include "tmb/id_typedefs.h"
 +
  using std::fprintf;
  using std::fputc;
  using std::string;
  
++namespace tmb { class MessageBus; }
++
  namespace quickstep {
  
  class CatalogDatabase;
@@@ -49,13 -50,19 +54,24 @@@ constexpr char kAnalyzeCommand[] = "\\a
  
  /**
    * @brief Executes the command by calling the command handler.
-   *        
+   *
    * @param statement The parsed statement from the cli.
    * @param catalog_database The catalog information about the current database.
-   * @param out The stream where the output of the command has to be redirected to.      
++  * @param main_thread_client_id The TMB client ID of the main thread.
++  * @param foreman_client_id The TMB client ID of the Foreman thread.
++  * @param bus A pointer to the TMB.
+   * @param storage_manager The current StorageManager.
+   * @param query_processor The query processor to generate plans for SQL queries.
+   * @param foreman The foreman to execute query plans.
+   * @param out The stream where the output of the command has to be redirected to.
  */
  void executeCommand(const ParseStatement &statement,
                      const CatalogDatabase &catalog_database,
++                    const tmb::client_id main_thread_client_id,
++                    const tmb::client_id foreman_client_id,
++                    tmb::MessageBus *bus,
+                     StorageManager *storage_manager,
+                     QueryProcessor *query_processor,
 -                    Foreman *foreman,
                      FILE *out);
  
  /** @} */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --cc cli/QuickstepCli.cpp
index 66e58fe,558d6eb..6f954fe
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@@ -320,12 -323,7 +326,13 @@@ int main(int argc, char* argv[]) 
                                     worker_client_ids,
                                     worker_numa_nodes);
  
 -  foreman.setWorkerDirectory(&worker_directory);
 +  Foreman foreman(main_thread_client_id,
 +                  &worker_directory,
 +                  &bus,
 +                  query_processor->getDefaultDatabase(),
 +                  query_processor->getStorageManager(),
-                   num_numa_nodes_covered);
++                  -1,
++                  num_numa_nodes_system);
  
    // Start the worker threads.
    for (Worker &worker : workers) {
@@@ -365,7 -365,11 +376,13 @@@
            try {
              quickstep::cli::executeCommand(
                  *result.parsed_statement,
-                 *(query_processor->getDefaultDatabase()), stdout);
+                 *(query_processor->getDefaultDatabase()),
++                main_thread_client_id,
++                foreman.getBusClientID(),
++                &bus,
+                 query_processor->getStorageManager(),
+                 query_processor.get(),
 -                &foreman,
+                 stdout);
            } catch (const quickstep::SqlError &sql_error) {
              fprintf(stderr, "%s",
                      sql_error.formatMessage(*command_string).c_str());
@@@ -385,20 -389,14 +402,25 @@@
          }
  
          DCHECK(query_handle->getQueryPlanMutable() != nullptr);
-         AdmitRequestMessage request_message(query_handle.get());
-         TaggedMessage admit_tagged_message(
-             &request_message, sizeof(request_message), kAdmitRequestMessage);
 -        foreman.setQueryPlan(query_handle->getQueryPlanMutable()->getQueryPlanDAGMutable());
 -
 -        foreman.reconstructQueryContextFromProto(query_handle->getQueryContextProto());
--
++        // TODO(harshad) - In the future when queries are not admitted
++        // immediately, calculate their waiting time separately.
++        LOG(INFO) << "Address of query handle in QuickstepCli: " << query_handle.get();
 +        start = std::chrono::steady_clock::now();
-         QueryExecutionUtil::SendTMBMessage(&bus,
-                                            main_thread_client_id,
-                                            foreman.getBusClientID(),
-                                            std::move(admit_tagged_message));
++        const tmb::MessageBus::SendStatus send_status =
++            QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
++                main_thread_client_id,
++                foreman.getBusClientID(),
++                query_handle.get(),
++                &bus);
++        if (send_status != tmb::MessageBus::SendStatus::kOK) {
++          fprintf(stderr, "\nQuery %s could not be admitted to the system\n", command_string->c_str());
++          continue;
++        }
          try {
 -          start = std::chrono::steady_clock::now();
 -          foreman.start();
 -          foreman.join();
 +          const AnnotatedMessage annotated_msg =
 +              bus.Receive(main_thread_client_id, 0, true);
 +          const TaggedMessage &tagged_message = annotated_msg.tagged_message;
 +          DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type());
            end = std::chrono::steady_clock::now();
  
            const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/cli/tests/CommandExecutorTestRunner.cpp
----------------------------------------------------------------------
diff --cc cli/tests/CommandExecutorTestRunner.cpp
index f5c37c3,9cd493e..794f7e1
--- a/cli/tests/CommandExecutorTestRunner.cpp
+++ b/cli/tests/CommandExecutorTestRunner.cpp
@@@ -92,6 -87,9 +92,11 @@@ void CommandExecutorTestRunner::runTest
            quickstep::cli::executeCommand(
                *result.parsed_statement,
                *(test_database_loader_.catalog_database()),
++              main_thread_client_id_,
++              foreman_->getBusClientID(),
++              &bus_,
+               test_database_loader_.storage_manager(),
+               nullptr,
 -              nullptr,
                output_stream.file());
          } else  {
            QueryHandle query_handle(optimizer_context.query_id());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_execution/AdmitRequestMessage.hpp
----------------------------------------------------------------------
diff --cc query_execution/AdmitRequestMessage.hpp
index e33b354,0000000..e2a1077
mode 100644,000000..100644
--- a/query_execution/AdmitRequestMessage.hpp
+++ b/query_execution/AdmitRequestMessage.hpp
@@@ -1,73 -1,0 +1,75 @@@
 +/**
 + *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
 + *     University of Wisconsin\u2014Madison.
 + *
 + *   Licensed under the Apache License, Version 2.0 (the "License");
 + *   you may not use this file except in compliance with the License.
 + *   You may obtain a copy of the License at
 + *
 + *       http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *   Unless required by applicable law or agreed to in writing, software
 + *   distributed under the License is distributed on an "AS IS" BASIS,
 + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + *   See the License for the specific language governing permissions and
 + *   limitations under the License.
 + **/
 +
 +#ifndef QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_
 +#define QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_
 +
 +#include <vector>
 +
 +#include "utility/Macros.hpp"
 +
 +namespace quickstep {
 +
 +class QueryHandle;
 +
 +/** \addtogroup QueryExecution
 + *  @{
 + */
 +
 +/**
 + * @brief A message requesting a query or queries to be admitted to the system.
 + **/
 +class AdmitRequestMessage {
 + public:
 +  /**
 +   * @brief Constructor.
 +   *
 +   * @param query_handles The handles of the queries requesting to be admitted
 +   *        to the system.
 +   **/
 +  explicit AdmitRequestMessage(const std::vector<QueryHandle*> &query_handles)
 +      : query_handles_(query_handles) {}
 +
 +  /**
 +   * @brief Constructor for requesting single query admission.
 +   *
 +   * @param query_handle The handle of the query requesting to be admitted.
 +   **/
 +  explicit AdmitRequestMessage(QueryHandle *query_handle) {
 +    query_handles_.push_back(query_handle);
 +  }
 +
 +  /**
 +   * @brief Get the query handles from this message.
 +   **/
 +  const std::vector<QueryHandle*>& getQueryHandles() const {
++    LOG(INFO) << "Query handle in getQueryHandles(): " << query_handles_.front()
++      << " [0] " << query_handles_[0];
 +    return query_handles_;
 +  }
 +
 + private:
 +  std::vector<QueryHandle*> query_handles_;
 +
 +  DISALLOW_COPY_AND_ASSIGN(AdmitRequestMessage);
 +};
 +
 +/** @} */
 +
 +}  // namespace quickstep
 +
 +#endif  // QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --cc query_execution/CMakeLists.txt
index 9658c08,7d9d601..c8c8161
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@@ -20,10 -20,11 +20,13 @@@ QS_PROTOBUF_GENERATE_CPP(queryexecution
                           QueryExecutionMessages.proto)
  
  # Declare micro-libs:
 +add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
+ if (ENABLE_DISTRIBUTED)
+   add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
+ endif()
  add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
  add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
 +add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
  add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
  add_library(quickstep_queryexecution_QueryContext_proto
              ${queryexecution_QueryContext_proto_srcs}
@@@ -42,20 -43,43 +45,33 @@@ add_library(quickstep_queryexecution_Wo
  add_library(quickstep_queryexecution_WorkerSelectionPolicy ../empty_src.cpp WorkerSelectionPolicy.hpp)
  
  # Link dependencies:
 +target_link_libraries(quickstep_queryexecution_AdmitRequestMessage
 +                      quickstep_utility_Macros)
+ if (ENABLE_DISTRIBUTED)
+   target_link_libraries(quickstep_queryexecution_BlockLocator
+                         glog
+                         quickstep_queryexecution_QueryExecutionMessages_proto
+                         quickstep_queryexecution_QueryExecutionTypedefs
+                         quickstep_queryexecution_QueryExecutionUtil
+                         quickstep_storage_StorageBlockInfo
+                         quickstep_storage_StorageConstants
+                         quickstep_threading_Thread
+                         quickstep_threading_ThreadUtil
+                         quickstep_utility_Macros
+                         tmb)
+ endif()
  target_link_libraries(quickstep_queryexecution_Foreman
 +                      gflags_nothreads-static
                        glog
 -                      gtest
 -                      quickstep_catalog_CatalogDatabase
 -                      quickstep_catalog_CatalogRelation
 -                      quickstep_catalog_CatalogTypedefs
 -                      quickstep_catalog_PartitionScheme
 +                      quickstep_queryexecution_AdmitRequestMessage
                        quickstep_queryexecution_ForemanLite
 -                      quickstep_queryexecution_QueryContext
 -                      quickstep_queryexecution_QueryExecutionMessages_proto
 -                      quickstep_queryexecution_QueryExecutionState
 +                      quickstep_queryexecution_PolicyEnforcer
                        quickstep_queryexecution_QueryExecutionTypedefs
                        quickstep_queryexecution_QueryExecutionUtil
 -                      quickstep_queryexecution_WorkOrdersContainer
                        quickstep_queryexecution_WorkerDirectory
                        quickstep_queryexecution_WorkerMessage
 -                      quickstep_relationaloperators_RebuildWorkOrder
 -                      quickstep_relationaloperators_RelationalOperator
 -                      quickstep_relationaloperators_WorkOrder
 -                      quickstep_storage_InsertDestination
 -                      quickstep_storage_StorageBlock
 -                      quickstep_storage_StorageBlockInfo
                        quickstep_threading_ThreadUtil
 -                      quickstep_utility_DAG
 +                      quickstep_utility_EqualsAnyConstant
                        quickstep_utility_Macros
                        tmb)
  target_link_libraries(quickstep_queryexecution_ForemanLite
@@@ -112,8 -128,7 +130,9 @@@ target_link_libraries(quickstep_queryex
                        quickstep_threading_ThreadIDBasedMap
                        tmb)
  target_link_libraries(quickstep_queryexecution_QueryExecutionUtil
++                      quickstep_queryexecution_AdmitRequestMessage
                        quickstep_queryexecution_QueryExecutionTypedefs
 +                      quickstep_queryexecution_WorkerMessage
                        quickstep_utility_Macros
                        tmb)
  target_link_libraries(quickstep_queryexecution_QueryManager
@@@ -179,7 -192,67 +198,37 @@@ target_link_libraries(quickstep_queryex
                        quickstep_queryexecution_WorkerDirectory
                        quickstep_queryexecution_WorkerMessage
                        quickstep_queryexecution_WorkerSelectionPolicy)
+ if (ENABLE_DISTRIBUTED)
+   target_link_libraries(quickstep_queryexecution
+                         quickstep_queryexecution_BlockLocator)
+ endif()
+ 
  # Tests:
+ if (ENABLE_DISTRIBUTED)
+   add_executable(BlockLocator_unittest
+                  "${CMAKE_CURRENT_SOURCE_DIR}/tests/BlockLocator_unittest.cpp")
+   target_link_libraries(BlockLocator_unittest
+                         gflags_nothreads-static
+                         glog
+                         gtest
+                         quickstep_catalog_CatalogAttribute
+                         quickstep_catalog_CatalogRelation
+                         quickstep_queryexecution_BlockLocator
+                         quickstep_queryexecution_QueryExecutionMessages_proto
+                         quickstep_queryexecution_QueryExecutionTypedefs
+                         quickstep_queryexecution_QueryExecutionUtil
+                         quickstep_storage_StorageBlob
+                         quickstep_storage_StorageBlock
+                         quickstep_storage_StorageBlockInfo
+                         quickstep_storage_StorageConstants
+                         quickstep_storage_StorageManager
+                         quickstep_types_TypeFactory
+                         quickstep_types_TypeID
+                         tmb
+                         ${LIBS})
+   add_test(BlockLocator_unittest BlockLocator_unittest)
+ endif()
+ 
 -add_executable(Foreman_unittest
 -  "${CMAKE_CURRENT_SOURCE_DIR}/tests/Foreman_unittest.cpp")
 -target_link_libraries(Foreman_unittest
 -                      glog
 -                      gtest
 -                      gtest_main
 -                      quickstep_catalog_CatalogDatabase
 -                      quickstep_catalog_CatalogRelation
 -                      quickstep_catalog_CatalogTypedefs
 -                      quickstep_queryexecution_Foreman
 -                      quickstep_queryexecution_QueryContext
 -                      quickstep_queryexecution_QueryContext_proto
 -                      quickstep_queryexecution_QueryExecutionState
 -                      quickstep_queryexecution_QueryExecutionTypedefs
 -                      quickstep_queryexecution_WorkOrdersContainer
 -                      quickstep_queryexecution_WorkerDirectory
 -                      quickstep_queryexecution_WorkerMessage
 -                      quickstep_queryoptimizer_QueryPlan
 -                      quickstep_relationaloperators_RelationalOperator
 -                      quickstep_relationaloperators_WorkOrder
 -                      quickstep_storage_InsertDestination
 -                      quickstep_storage_InsertDestination_proto
 -                      quickstep_storage_StorageBlock
 -                      quickstep_storage_StorageBlockInfo
 -                      quickstep_storage_StorageManager
 -                      quickstep_utility_DAG
 -                      quickstep_utility_Macros
 -                      tmb)
 -add_test(Foreman_unittest Foreman_unittest)
 -
  add_executable(QueryManager_unittest
    "${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManager_unittest.cpp")
  target_link_libraries(QueryManager_unittest

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --cc query_execution/Foreman.cpp
index 3609120,304c429..6cec70a
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@@ -97,40 -58,161 +97,43 @@@ void Foreman::run() 
      // We can pin the foreman thread to a CPU if specified.
      ThreadUtil::BindToCPU(cpu_id_);
    }
 -  initializeState();
 -
 -  DEBUG_ASSERT(query_dag_ != nullptr);
 -  const dag_node_index dag_size = query_dag_->size();
 -
 -  // Collect all the workorders from all the relational operators in the DAG.
 -  for (dag_node_index index = 0; index < dag_size; ++index) {
 -    if (checkAllBlockingDependenciesMet(index)) {
 -      query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
 -      processOperator(index, false);
 -    }
 -  }
 -
 -  // Dispatch the WorkOrders generated so far.
 -  dispatchWorkerMessages(0, 0);
 -}
 -
 -void Foreman::processWorkOrderCompleteMessage(const dag_node_index op_index,
 -                                              const size_t worker_thread_index) {
 -  query_exec_state_->decrementNumQueuedWorkOrders(op_index);
 -
 -  // As the given worker finished executing a WorkOrder, decrement its number
 -  // of queued WorkOrders.
 -  workers_->decrementNumQueuedWorkOrders(worker_thread_index);
 -
 -  // Check if new work orders are available and fetch them if so.
 -  fetchNormalWorkOrders(op_index);
 -
 -  if (checkRebuildRequired(op_index)) {
 -    if (checkNormalExecutionOver(op_index)) {
 -      if (!checkRebuildInitiated(op_index)) {
 -        if (initiateRebuild(op_index)) {
 -          // Rebuild initiated and completed right away.
 -          markOperatorFinished(op_index);
 -        } else {
 -          // Rebuild under progress.
 -        }
 -      } else if (checkRebuildOver(op_index)) {
 -        // Rebuild was under progress and now it is over.
 -        markOperatorFinished(op_index);
 -      }
 -    } else {
 -      // Normal execution under progress for this operator.
 -    }
 -  } else if (checkOperatorExecutionOver(op_index)) {
 -    // Rebuild not required for this operator and its normal execution is
 -    // complete.
 -    markOperatorFinished(op_index);
 -  }
 -
 -  for (const pair<dag_node_index, bool> &dependent_link :
 -       query_dag_->getDependents(op_index)) {
 -    const dag_node_index dependent_op_index = dependent_link.first;
 -    if (checkAllBlockingDependenciesMet(dependent_op_index)) {
 -      // Process the dependent operator (of the operator whose WorkOrder
 -      // was just executed) for which all the dependencies have been met.
 -      processOperator(dependent_op_index, true);
 -    }
 -  }
 -
 -  // Dispatch the WorkerMessages to the workers. We prefer to start the search
 -  // for the schedulable WorkOrders beginning from 'op_index'. The first
 -  // candidate worker to receive the next WorkOrder is the one that sent the
 -  // response message to Foreman.
 -  dispatchWorkerMessages(worker_thread_index, op_index);
 -}
 -
 -void Foreman::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index,
 -                                                     const size_t worker_thread_index) {
 -  query_exec_state_->decrementNumRebuildWorkOrders(op_index);
 -  workers_->decrementNumQueuedWorkOrders(worker_thread_index);
 -
 -  if (checkRebuildOver(op_index)) {
 -    markOperatorFinished(op_index);
 -
 -    for (const pair<dag_node_index, bool> &dependent_link :
 -         query_dag_->getDependents(op_index)) {
 -      const dag_node_index dependent_op_index = dependent_link.first;
 -      if (checkAllBlockingDependenciesMet(dependent_op_index)) {
 -        processOperator(dependent_op_index, true);
 -      }
 -    }
 -  }
 -
 -  // Dispatch the WorkerMessages to the workers. We prefer to start the search
 -  // for the schedulable WorkOrders beginning from 'op_index'. The first
 -  // candidate worker to receive the next WorkOrder is the one that sent the
 -  // response message to Foreman.
 -  dispatchWorkerMessages(worker_thread_index, op_index);
 -}
 -
 -void Foreman::processDataPipelineMessage(const dag_node_index op_index,
 -                                         const block_id block,
 -                                         const relation_id rel_id) {
 -  for (const dag_node_index consumer_index :
 -       output_consumers_[op_index]) {
 -    // Feed the streamed block to the consumer. Note that 'output_consumers_'
 -    // only contain those dependents of operator with index = op_index which are
 -    // eligible to receive streamed input.
 -    query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id);
 -    // Because of the streamed input just fed, check if there are any new
 -    // WorkOrders available and if so, fetch them.
 -    fetchNormalWorkOrders(consumer_index);
 -  }
 -
 -  // Dispatch the WorkerMessages to the workers. We prefer to start the search
 -  // for the schedulable WorkOrders beginning from 'op_index'. The first
 -  // candidate worker to receive the next WorkOrder is the one that sent the
 -  // response message to Foreman.
 -  // TODO(zuyu): Improve the data locality for the next WorkOrder.
 -  dispatchWorkerMessages(0, op_index);
 -}
 -
 -void Foreman::processFeedbackMessage(const WorkOrder::FeedbackMessage &msg) {
 -  RelationalOperator *op =
 -      query_dag_->getNodePayloadMutable(msg.header().rel_op_index);
 -  op->receiveFeedbackMessage(msg);
 -}
 -
 -void Foreman::run() {
 -  // Initialize before for Foreman eventloop.
 -  initialize();
  
    // Event loop
 -  while (!query_exec_state_->hasQueryExecutionFinished()) {
 +  for (;;) {
      // Receive() causes this thread to sleep until next message is received.
 -    AnnotatedMessage annotated_msg = bus_->Receive(foreman_client_id_, 0, true);
 +    const AnnotatedMessage annotated_msg =
 +        bus_->Receive(foreman_client_id_, 0, true);
      const TaggedMessage &tagged_message = annotated_msg.tagged_message;
 -    switch (tagged_message.message_type()) {
 -      case kWorkOrderCompleteMessage: {
 -        serialization::WorkOrderCompletionMessage proto;
 -        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
 -
 -        processWorkOrderCompleteMessage(proto.operator_index(), proto.worker_thread_index());
 -        break;
 -      }
 -      case kRebuildWorkOrderCompleteMessage: {
 -        serialization::WorkOrderCompletionMessage proto;
 -        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
 -
 -        processRebuildWorkOrderCompleteMessage(proto.operator_index(), proto.worker_thread_index());
 +    const tmb::message_type_id message_type = tagged_message.message_type();
 +    switch (message_type) {
 +      case kCatalogRelationNewBlockMessage:  // Fall through
 +      case kDataPipelineMessage:
 +      case kRebuildWorkOrderCompleteMessage:
 +      case kWorkOrderCompleteMessage:
 +      case kWorkOrderFeedbackMessage:
 +      case kWorkOrdersAvailableMessage: {
 +        policy_enforcer_->processMessage(tagged_message);
          break;
        }
 -      case kCatalogRelationNewBlockMessage: {
 -        serialization::CatalogRelationNewBlockMessage proto;
 -        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
 -
 -        const block_id block = proto.block_id();
 -
 -        CatalogRelation *relation =
 -            static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
 -        relation->addBlock(block);
 -
 -        if (proto.has_partition_id()) {
 -          relation->getPartitionSchemeMutable()->addBlockToPartition(proto.partition_id(), block);
 +      case kAdmitRequestMessage: {
 +        const AdmitRequestMessage *msg =
 +            static_cast<const AdmitRequestMessage *>(tagged_message.message());
 +        const vector<QueryHandle *> &query_handles = msg->getQueryHandles();
 +
++        LOG(INFO) << "Address of query handle in foreman front: " << query_handles.front() <<
++            " [0]: " << query_handles[0];
 +        DCHECK(!query_handles.empty());
 +        bool all_queries_admitted = true;
 +        if (query_handles.size() == 1u) {
++          LOG(INFO) << "Address of query handle in foreman: " << query_handles.front();
 +          all_queries_admitted =
 +              policy_enforcer_->admitQuery(query_handles.front());
 +        } else {
 +          all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
 +        }
 +        if (!all_queries_admitted) {
 +          LOG(WARNING) << "The scheduler could not admit all the queries";
 +          // TODO(harshad) - Inform the main thread about the failure.
          }
          break;
        }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --cc query_execution/PolicyEnforcer.cpp
index 1ee1df9,0000000..15b90a1
mode 100644,000000..100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@@ -1,177 -1,0 +1,180 @@@
 +/**
 + *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
 + *     University of Wisconsin\u2014Madison.
 + *
 + *   Licensed under the Apache License, Version 2.0 (the "License");
 + *   you may not use this file except in compliance with the License.
 + *   You may obtain a copy of the License at
 + *
 + *       http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *   Unless required by applicable law or agreed to in writing, software
 + *   distributed under the License is distributed on an "AS IS" BASIS,
 + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + *   See the License for the specific language governing permissions and
 + *   limitations under the License.
 + **/
 +
 +#include "query_execution/PolicyEnforcer.hpp"
 +
 +#include <cstddef>
 +#include <memory>
 +#include <queue>
 +#include <utility>
 +#include <unordered_map>
 +#include <vector>
 +
 +#include "query_execution/QueryExecutionMessages.pb.h"
 +#include "query_execution/QueryManager.hpp"
 +#include "query_optimizer/QueryHandle.hpp"
 +#include "relational_operators/WorkOrder.hpp"
 +
 +#include "glog/logging.h"
 +
 +namespace quickstep {
 +
 +bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
 +  if (admitted_queries_.size() < kMaxConcurrentQueries) {
 +    // Ok to admit the query.
 +    const std::size_t query_id = query_handle->query_id();
 +    if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
-       admitted_queries_[query_id].reset(
-           new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
-                            catalog_database_, storage_manager_, bus_));
++      admitted_queries_[query_id].reset(new QueryManager(foreman_client_id_,
++                                                         num_numa_nodes_,
++                                                         query_handle,
++                                                         catalog_database_,
++                                                         storage_manager_,
++                                                         bus_));
 +      return true;
 +    } else {
 +      LOG(ERROR) << "Query with the same ID " << query_id << " exists";
 +      return false;
 +    }
 +  } else {
 +    // This query will have to wait.
 +    waiting_queries_.push(query_handle);
 +    return false;
 +  }
 +}
 +
 +void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
 +  // TODO(harshad) : Provide processXMessage() public functions in
 +  // QueryManager, so that we need to extract message from the
 +  // TaggedMessage only once.
 +  std::size_t query_id;
 +  switch (tagged_message.message_type()) {
 +    case kWorkOrderCompleteMessage:  // Fall through.
 +    case kRebuildWorkOrderCompleteMessage: {
 +      serialization::WorkOrderCompletionMessage proto;
 +      CHECK(proto.ParseFromArray(tagged_message.message(),
 +                                 tagged_message.message_bytes()));
 +      query_id = proto.query_id();
 +      break;
 +    }
 +    case kCatalogRelationNewBlockMessage: {
 +      serialization::CatalogRelationNewBlockMessage proto;
 +      CHECK(proto.ParseFromArray(tagged_message.message(),
 +                                 tagged_message.message_bytes()));
 +      query_id = proto.query_id();
 +      break;
 +    }
 +    case kDataPipelineMessage: {
 +      serialization::DataPipelineMessage proto;
 +      CHECK(proto.ParseFromArray(tagged_message.message(),
 +                                 tagged_message.message_bytes()));
 +      query_id = proto.query_id();
 +      break;
 +    }
 +    case kWorkOrdersAvailableMessage: {
 +      serialization::WorkOrdersAvailableMessage proto;
 +      CHECK(proto.ParseFromArray(tagged_message.message(),
 +                                 tagged_message.message_bytes()));
 +      query_id = proto.query_id();
 +      break;
 +    }
 +    case kWorkOrderFeedbackMessage: {
 +      // TODO(harshad) Add query ID to FeedbackMessage.
 +      WorkOrder::FeedbackMessage msg(const_cast<void *>(tagged_message.message()), tagged_message.message_bytes());
 +      query_id = msg.header().query_id;
 +      break;
 +    }
 +    default:
 +      LOG(FATAL) << "Unknown message type found in PolicyEnforcer";
 +  }
 +  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
 +  const QueryManager::QueryStatusCode return_code =
 +      admitted_queries_[query_id]->processMessage(tagged_message);
 +  if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
 +    removeQuery(query_id);
 +    if (!waiting_queries_.empty()) {
 +      // Admit the earliest waiting query.
 +      QueryHandle *new_query = waiting_queries_.front();
 +      waiting_queries_.pop();
 +      admitQuery(new_query);
 +    }
 +  }
 +}
 +
 +void PolicyEnforcer::getWorkerMessages(
 +    std::vector<std::unique_ptr<WorkerMessage>> *worker_messages) {
 +  // Iterate over admitted queries until either there are no more
 +  // messages available, or the maximum number of messages have
 +  // been collected.
 +  DCHECK(worker_messages->empty());
 +  // TODO(harshad) - Make this function generic enough so that it
 +  // works well when multiple queries are getting executed.
 +  std::size_t per_query_share = 0;
 +  if (!admitted_queries_.empty()) {
 +    per_query_share = kMaxNumWorkerMessages / admitted_queries_.size();
 +  } else {
 +    LOG(WARNING) << "Requesting WorkerMessages when no query is running";
 +    return;
 +  }
 +  DCHECK_GT(per_query_share, 0u);
 +  std::vector<std::size_t> finished_queries_ids;
 +
 +  for (const auto &admitted_query_info : admitted_queries_) {
 +    QueryManager *curr_query_manager = admitted_query_info.second.get();
 +    DCHECK(curr_query_manager != nullptr);
 +    std::size_t messages_collected_curr_query = 0;
 +    while (messages_collected_curr_query < per_query_share) {
 +      WorkerMessage *next_worker_message =
 +          curr_query_manager->getNextWorkerMessage(0, -1);
 +      if (next_worker_message != nullptr) {
 +        ++messages_collected_curr_query;
 +        worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
 +      } else {
 +        // No more work ordes from the current query at this time.
 +        // Check if the query's execution is over.
 +        if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
 +          // If the query has been executed, remove it.
 +          finished_queries_ids.push_back(admitted_query_info.first);
 +        }
 +        break;
 +      }
 +    }
 +  }
 +  for (std::size_t finished_qid : finished_queries_ids) {
 +    removeQuery(finished_qid);
 +  }
 +}
 +
 +void PolicyEnforcer::removeQuery(const std::size_t query_id) {
 +  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
 +  if (!admitted_queries_[query_id]->getQueryExecutionState().hasQueryExecutionFinished()) {
 +    LOG(WARNING) << "Removing query with ID " << query_id
 +                 << " that hasn't finished its execution";
 +  }
 +  admitted_queries_.erase(query_id);
 +}
 +
 +bool PolicyEnforcer::admitQueries(
 +    const std::vector<QueryHandle *> &query_handles) {
 +  for (QueryHandle *curr_query : query_handles) {
 +    if (!admitQuery(curr_query)) {
 +      return false;
 +    }
 +  }
 +  return true;
 +}
 +
 +}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_execution/QueryContext.cpp
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --cc query_execution/QueryContext.hpp
index e170494,7d5628d..83627bc
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@@ -406,15 -460,9 +460,16 @@@ class QueryContext 
      return update_groups_[id];
    }
  
 +  /**
 +   * @param Get the ID of this query.
 +   **/
 +  inline const std::size_t getQueryID() const {
 +    return query_id_;
 +  }
 +
   private:
    std::vector<std::unique_ptr<AggregationOperationState>> aggregation_states_;
+   std::vector<std::unique_ptr<BloomFilter>> bloom_filters_;
    std::vector<std::unique_ptr<const GeneratorFunctionHandle>> generator_functions_;
    std::vector<std::unique_ptr<InsertDestination>> insert_destinations_;
    std::vector<std::unique_ptr<JoinHashTable>> join_hash_tables_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_execution/QueryContext.proto
----------------------------------------------------------------------
diff --cc query_execution/QueryContext.proto
index 5632482,b37286c..98cd0b6
--- a/query_execution/QueryContext.proto
+++ b/query_execution/QueryContext.proto
@@@ -42,16 -43,15 +43,17 @@@ message QueryContext 
    }
  
    repeated AggregationOperationState aggregation_states = 1;
-   repeated HashTable join_hash_tables = 2;
-   repeated InsertDestination insert_destinations = 3;
-   repeated Predicate predicates = 4;
-   repeated ScalarGroup scalar_groups = 5;
-   repeated SortConfiguration sort_configs = 6;
-   repeated Tuple tuples = 7;
-   repeated GeneratorFunctionHandle generator_functions = 8;
+   repeated BloomFilter bloom_filters = 2;
+   repeated GeneratorFunctionHandle generator_functions = 3;
+   repeated HashTable join_hash_tables = 4;
+   repeated InsertDestination insert_destinations = 5;
+   repeated Predicate predicates = 6;
+   repeated ScalarGroup scalar_groups = 7;
+   repeated SortConfiguration sort_configs = 8;
+   repeated Tuple tuples = 9;
  
    // NOTE(zuyu): For UpdateWorkOrder only.
-   repeated UpdateGroup update_groups = 9;
+   repeated UpdateGroup update_groups = 10;
 +
-   required uint64 query_id = 10;
++  required uint64 query_id = 11;
  }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --cc query_execution/QueryExecutionMessages.proto
index f8628fa,15803cf..9d9a9e5
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@@ -44,5 -45,34 +48,35 @@@ message DataPipelineMessage 
  
  message WorkOrdersAvailableMessage {
    required uint64 operator_index = 1;
 +  required uint64 query_id = 2;
  }
+ 
+ // BlockLocator related messages.
+ message BlockDomainRegistrationMessage {
+   // Format IP:Port, i.e., "0.0.0.0:0".
+   required string domain_network_address = 1;
+ }
+ 
+ // Used for RegistrationResponse, Unregistration, and FailureReport.
+ message BlockDomainMessage {
+   required uint32 block_domain = 1;
+ }
+ 
+ // Used when StorageManager loads or evicts a block or a blob from its buffer
+ // pool.
+ message BlockLocationMessage {
+   required fixed64 block_id = 1;
+   required uint32 block_domain = 2;
+ }
+ 
+ message BlockMessage {
+   required fixed64 block_id = 1;
+ }
+ 
+ message LocateBlockResponseMessage {
+   repeated uint32 block_domains = 1;
+ }
+ 
+ message GetPeerDomainNetworkAddressesResponseMessage {
+   repeated string domain_network_addresses = 1;
+ }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --cc query_execution/QueryExecutionTypedefs.hpp
index 6ba792e,fc253bc..fa97158
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@@ -55,7 -56,8 +56,9 @@@ using ClientIDMap = ThreadIDBasedMap<cl
                                       'a',
                                       'p'>;
  
+ // We sort the following message types in the order of a life cycle of a query.
  enum QueryExecutionMessageType : message_type_id {
++  kAdmitRequestMessage,  // Requesting a query (or queries) to be admitted.
    kWorkOrderMessage,  // From Foreman to Worker.
    kWorkOrderCompleteMessage,  // From Worker to Foreman.
    kCatalogRelationNewBlockMessage,  // From InsertDestination to Foreman.
@@@ -65,9 -67,21 +68,22 @@@
                                // their corresponding RelationalOperators.
    kRebuildWorkOrderMessage,  // From Foreman to Worker.
    kRebuildWorkOrderCompleteMessage,  // From Worker to Foreman.
 -  kPoisonMessage,  // From the CLI shell to Foreman, then from Foreman to Workers.
++  kWorkloadCompletionMessage,  // From Foreman to main thread.
 +  kPoisonMessage,  // From the main thread to Foreman and workers.
-   kAdmitRequestMessage,  // Requesting a query (or queries) to be admitted.
-   kWorkloadCompletionMessage  // From Foreman to main thread.
+ 
+ #ifdef QUICKSTEP_DISTRIBUTED
+   // BlockLocator related messages, sorted in a life cycle of StorageManager
+   // with a unique block domain.
+   kBlockDomainRegistrationMessage,  // From Worker to BlockLocator.
+   kBlockDomainRegistrationResponseMessage,  // From BlockLocator to Worker.
+   kAddBlockLocationMessage,  // From StorageManager to BlockLocator.
+   kDeleteBlockLocationMessage,  // From StorageManager to BlockLocator.
+   kLocateBlockMessage,  // From StorageManager to BlockLocator.
+   kLocateBlockResponseMessage,  // From BlockLocator to StorageManager.
+   kGetPeerDomainNetworkAddressesMessage,  // From StorageManager to BlockLocator.
+   kGetPeerDomainNetworkAddressesResponseMessage,  // From BlockLocator to StorageManager.
+   kBlockDomainUnregistrationMessage,  // From StorageManager to BlockLocator.
+ #endif
  };
  
  /** @} */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --cc query_execution/QueryExecutionUtil.hpp
index 0cd0a4e,a8b6a38..267bbe6
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@@ -17,11 -17,9 +17,12 @@@
  #ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_
  #define QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_
  
 +#include <memory>
  #include <utility>
  
++#include "query_execution/AdmitRequestMessage.hpp"
  #include "query_execution/QueryExecutionTypedefs.hpp"
 +#include "query_execution/WorkerMessage.hpp"
  #include "utility/Macros.hpp"
  
  #include "tmb/address.h"
@@@ -31,6 -29,6 +32,8 @@@
  
  namespace quickstep {
  
++class QueryHandle;
++
  /**
   * @brief A static class for reusable methods in query_execution module.
   **/
@@@ -62,27 -60,6 +65,67 @@@ class QueryExecutionUtil 
                       std::move(tagged_message));
    }
  
-   static void BroadcastPoisonMessage(const tmb::client_id sender_id, tmb::MessageBus *bus) {
++  /**
++   * @brief Construct and send an AdmitRequestMessage from a given sender to a
++   *        given recipient.
++   *
++   * @param sender_id The TMB client ID of the sender.
++   * @param receiver_id The TMB client ID of the receiver.
++   * @param query_handle The QueryHandle used in the AdmitRequestMessage.
++   * @param bus A pointer to the TMB.
++   * @param tagged_message A moved from reference to the tagged message.
++   *
++   * @return A status code indicating the result of the message delivery.
++   *         The caller should ensure that the status is SendStatus::kOK.
++   **/
++  static tmb::MessageBus::SendStatus ConstructAndSendAdmitRequestMessage(
++      const tmb::client_id sender_id,
++      const tmb::client_id receiver_id,
++      QueryHandle *query_handle,
++      MessageBus *bus) {
++    LOG(INFO) << "Address of query handle in QExecUtil: " << query_handle;
++    std::unique_ptr<AdmitRequestMessage> request_message(new AdmitRequestMessage(query_handle));
++    const std::vector<QueryHandle *> &query_handles = request_message->getQueryHandles();
++      LOG(INFO) << "Address of query handle in foreman front: " << query_handles.front() << " [0]: " << query_handles[0];
++    const std::size_t size_of_request_msg = sizeof(*request_message);
++    TaggedMessage admit_tagged_message(
++        request_message.release(), size_of_request_msg, kAdmitRequestMessage);
++
++    return QueryExecutionUtil::SendTMBMessage(
++        bus, sender_id, receiver_id, std::move(admit_tagged_message));
++  }
++
++  /**
++   * @brief Broadcast a poison message from a given sender.
++   *
++   * @note This message will be received by all the clients that have registered
++   *       as recipients of the poison message type.
++   *
++   * @param sender_id The TMB client ID of the sender.
++   * @param bus A pointer to the TMB.
++   **/
++  static void BroadcastPoisonMessage(const tmb::client_id sender_id,
++                                     tmb::MessageBus *bus) {
 +    // Terminate all threads.
 +    // The sender thread broadcasts poison message to the workers and foreman.
 +    // Each worker dies after receiving poison message. The order of workers'
 +    // death is irrelavant.
 +    MessageStyle style;
 +    style.Broadcast(true);
 +    Address address;
 +    address.All(true);
 +    std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage());
 +    TaggedMessage poison_tagged_message(poison_message.get(),
 +                                        sizeof(*poison_message),
 +                                        kPoisonMessage);
 +
 +    const tmb::MessageBus::SendStatus send_status = bus->Send(
 +        sender_id, address, style, std::move(poison_tagged_message));
 +    CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
 +       "Broadcast poison message from sender with TMB client ID " << sender_id
 +       << " failed";
 +  }
 +
   private:
    /**
     * @brief Constructor. Made private to avoid instantiation.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --cc query_optimizer/ExecutionGenerator.hpp
index 0056cbf,0630bca..9639cf1
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@@ -100,8 -103,8 +103,9 @@@ class ExecutionGenerator 
        : optimizer_context_(DCHECK_NOTNULL(optimizer_context)),
          query_handle_(DCHECK_NOTNULL(query_handle)),
          execution_plan_(DCHECK_NOTNULL(query_handle->getQueryPlanMutable())),
-         query_context_proto_(DCHECK_NOTNULL(query_handle->getQueryContextProtoMutable())) {
+         query_context_proto_(DCHECK_NOTNULL(query_handle->getQueryContextProtoMutable())),
+         execution_heuristics_(new ExecutionHeuristics()) {
 +    query_context_proto_->set_query_id(query_handle->query_id());
  #ifdef QUICKSTEP_DISTRIBUTED
      catalog_database_cache_proto_ = DCHECK_NOTNULL(query_handle->getCatalogDatabaseCacheProtoMutable());
  #endif

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_optimizer/tests/CMakeLists.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --cc query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
index 930087a,56b53ba..59ba5c1
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
@@@ -94,20 -90,13 +94,23 @@@ void ExecutionGeneratorTestRunner::runT
              physical_generator.generatePlan(
                  logical_generator.generatePlan(*result.parsed_statement));
          execution_generator.generatePlan(physical_plan);
 -        foreman_->setQueryPlan(
 -            query_handle.getQueryPlanMutable()->getQueryPlanDAGMutable());
  
-         AdmitRequestMessage request_message(&query_handle);
-         TaggedMessage admit_tagged_message(
-             &request_message, sizeof(request_message), kAdmitRequestMessage);
-         QueryExecutionUtil::SendTMBMessage(&bus_,
-                                            main_thread_client_id_,
-                                            foreman_->getBusClientID(),
-                                            std::move(admit_tagged_message));
 -        foreman_->reconstructQueryContextFromProto(query_handle.getQueryContextProto());
++        const tmb::MessageBus::SendStatus send_status =
++            QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
++                main_thread_client_id_,
++                foreman_->getBusClientID(),
++                &query_handle,
++                &bus_);
++        if (send_status != tmb::MessageBus::SendStatus::kOK) {
++          std::fprintf(stderr, "\nQuery %s could not be admitted to the system\n", input.c_str());
++          continue;
++        }
  
 -        foreman_->start();
 -        foreman_->join();
 +        // Receive workload completion message from Foreman.
 +        const AnnotatedMessage annotated_msg =
 +            bus_.Receive(main_thread_client_id_, 0, true);
 +        const TaggedMessage &tagged_message = annotated_msg.tagged_message;
 +        DCHECK(tagged_message.message_type() == kWorkloadCompletionMessage);
  
          const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation();
          if (query_result_relation) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bef0ae1d/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------