You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/10/05 22:02:57 UTC

[02/51] [abbrv] incubator-quickstep git commit: Add "COPY TO" operator for exporting data from Quickstep.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/parser/preprocessed/SqlParser_gen.hpp
----------------------------------------------------------------------
diff --git a/parser/preprocessed/SqlParser_gen.hpp b/parser/preprocessed/SqlParser_gen.hpp
index a6d12e2..f6b5247 100644
--- a/parser/preprocessed/SqlParser_gen.hpp
+++ b/parser/preprocessed/SqlParser_gen.hpp
@@ -94,90 +94,91 @@ extern int quickstep_yydebug;
     TOKEN_DECIMAL = 304,
     TOKEN_DEFAULT = 305,
     TOKEN_DELETE = 306,
-    TOKEN_DELIMITER = 307,
-    TOKEN_DESC = 308,
-    TOKEN_DISTINCT = 309,
-    TOKEN_DOUBLE = 310,
-    TOKEN_DROP = 311,
-    TOKEN_ELSE = 312,
-    TOKEN_END = 313,
-    TOKEN_ESCAPE_STRINGS = 314,
-    TOKEN_EXISTS = 315,
-    TOKEN_EXTRACT = 316,
-    TOKEN_FALSE = 317,
-    TOKEN_FIRST = 318,
-    TOKEN_FLOAT = 319,
-    TOKEN_FOLLOWING = 320,
-    TOKEN_FOR = 321,
-    TOKEN_FOREIGN = 322,
-    TOKEN_FROM = 323,
-    TOKEN_FULL = 324,
-    TOKEN_GROUP = 325,
-    TOKEN_HASH = 326,
-    TOKEN_HAVING = 327,
-    TOKEN_HOUR = 328,
-    TOKEN_IN = 329,
-    TOKEN_INDEX = 330,
-    TOKEN_INNER = 331,
-    TOKEN_INSERT = 332,
-    TOKEN_INTEGER = 333,
-    TOKEN_INTERVAL = 334,
-    TOKEN_INTO = 335,
-    TOKEN_JOIN = 336,
-    TOKEN_KEY = 337,
-    TOKEN_LAST = 338,
-    TOKEN_LEFT = 339,
-    TOKEN_LIMIT = 340,
-    TOKEN_LONG = 341,
-    TOKEN_MINUTE = 342,
-    TOKEN_MONTH = 343,
-    TOKEN_NULL = 344,
-    TOKEN_NULLS = 345,
-    TOKEN_OFF = 346,
-    TOKEN_ON = 347,
-    TOKEN_ORDER = 348,
-    TOKEN_OUTER = 349,
-    TOKEN_OVER = 350,
-    TOKEN_PARTITION = 351,
-    TOKEN_PARTITIONS = 352,
-    TOKEN_PERCENT = 353,
-    TOKEN_PRECEDING = 354,
-    TOKEN_PRIMARY = 355,
-    TOKEN_PRIORITY = 356,
-    TOKEN_QUIT = 357,
-    TOKEN_RANGE = 358,
-    TOKEN_REAL = 359,
-    TOKEN_REFERENCES = 360,
-    TOKEN_RIGHT = 361,
-    TOKEN_ROW = 362,
-    TOKEN_ROW_DELIMITER = 363,
-    TOKEN_ROWS = 364,
-    TOKEN_SECOND = 365,
-    TOKEN_SELECT = 366,
-    TOKEN_SET = 367,
-    TOKEN_SMA = 368,
-    TOKEN_SMALLINT = 369,
+    TOKEN_DESC = 307,
+    TOKEN_DISTINCT = 308,
+    TOKEN_DOUBLE = 309,
+    TOKEN_DROP = 310,
+    TOKEN_ELSE = 311,
+    TOKEN_END = 312,
+    TOKEN_EXISTS = 313,
+    TOKEN_EXTRACT = 314,
+    TOKEN_FALSE = 315,
+    TOKEN_FIRST = 316,
+    TOKEN_FLOAT = 317,
+    TOKEN_FOLLOWING = 318,
+    TOKEN_FOR = 319,
+    TOKEN_FOREIGN = 320,
+    TOKEN_FROM = 321,
+    TOKEN_FULL = 322,
+    TOKEN_GROUP = 323,
+    TOKEN_HASH = 324,
+    TOKEN_HAVING = 325,
+    TOKEN_HOUR = 326,
+    TOKEN_IN = 327,
+    TOKEN_INDEX = 328,
+    TOKEN_INNER = 329,
+    TOKEN_INSERT = 330,
+    TOKEN_INTEGER = 331,
+    TOKEN_INTERVAL = 332,
+    TOKEN_INTO = 333,
+    TOKEN_JOIN = 334,
+    TOKEN_KEY = 335,
+    TOKEN_LAST = 336,
+    TOKEN_LEFT = 337,
+    TOKEN_LIMIT = 338,
+    TOKEN_LONG = 339,
+    TOKEN_MINUTE = 340,
+    TOKEN_MONTH = 341,
+    TOKEN_NULL = 342,
+    TOKEN_NULLS = 343,
+    TOKEN_OFF = 344,
+    TOKEN_ON = 345,
+    TOKEN_ORDER = 346,
+    TOKEN_OUTER = 347,
+    TOKEN_OVER = 348,
+    TOKEN_PARTITION = 349,
+    TOKEN_PARTITIONS = 350,
+    TOKEN_PERCENT = 351,
+    TOKEN_PRECEDING = 352,
+    TOKEN_PRIMARY = 353,
+    TOKEN_PRIORITY = 354,
+    TOKEN_QUIT = 355,
+    TOKEN_RANGE = 356,
+    TOKEN_REAL = 357,
+    TOKEN_REFERENCES = 358,
+    TOKEN_RIGHT = 359,
+    TOKEN_ROW = 360,
+    TOKEN_ROW_DELIMITER = 361,
+    TOKEN_ROWS = 362,
+    TOKEN_SECOND = 363,
+    TOKEN_SELECT = 364,
+    TOKEN_SET = 365,
+    TOKEN_SMA = 366,
+    TOKEN_SMALLINT = 367,
+    TOKEN_STDERR = 368,
+    TOKEN_STDOUT = 369,
     TOKEN_SUBSTRING = 370,
     TOKEN_TABLE = 371,
     TOKEN_THEN = 372,
     TOKEN_TIME = 373,
     TOKEN_TIMESTAMP = 374,
-    TOKEN_TRUE = 375,
-    TOKEN_TUPLESAMPLE = 376,
-    TOKEN_UNBOUNDED = 377,
-    TOKEN_UNIQUE = 378,
-    TOKEN_UPDATE = 379,
-    TOKEN_USING = 380,
-    TOKEN_VALUES = 381,
-    TOKEN_VARCHAR = 382,
-    TOKEN_WHEN = 383,
-    TOKEN_WHERE = 384,
-    TOKEN_WINDOW = 385,
-    TOKEN_WITH = 386,
-    TOKEN_YEAR = 387,
-    TOKEN_YEARMONTH = 388,
-    TOKEN_EOF = 389,
-    TOKEN_LEX_ERROR = 390
+    TOKEN_TO = 375,
+    TOKEN_TRUE = 376,
+    TOKEN_TUPLESAMPLE = 377,
+    TOKEN_UNBOUNDED = 378,
+    TOKEN_UNIQUE = 379,
+    TOKEN_UPDATE = 380,
+    TOKEN_USING = 381,
+    TOKEN_VALUES = 382,
+    TOKEN_VARCHAR = 383,
+    TOKEN_WHEN = 384,
+    TOKEN_WHERE = 385,
+    TOKEN_WINDOW = 386,
+    TOKEN_WITH = 387,
+    TOKEN_YEAR = 388,
+    TOKEN_YEARMONTH = 389,
+    TOKEN_EOF = 390,
+    TOKEN_LEX_ERROR = 391
   };
 #endif
 
@@ -237,8 +238,7 @@ union YYSTYPE
   quickstep::ParseKeyStringValue *key_string_value_;
   quickstep::ParseKeyStringList *key_string_list_;
   quickstep::ParseKeyIntegerValue *key_integer_value_;
-
-  quickstep::ParseCopyFromParams *copy_from_params_;
+  quickstep::ParseKeyBoolValue *key_bool_value_;
 
   quickstep::ParseAssignment *assignment_;
   quickstep::PtrList<quickstep::ParseAssignment> *assignment_list_;
@@ -251,7 +251,7 @@ union YYSTYPE
   quickstep::ParseStatementUpdate *update_statement_;
   quickstep::ParseStatementInsert *insert_statement_;
   quickstep::ParseStatementDelete *delete_statement_;
-  quickstep::ParseStatementCopyFrom *copy_from_statement_;
+  quickstep::ParseStatementCopy *copy_statement_;
   quickstep::ParseStatementCreateTable *create_table_statement_;
   quickstep::ParsePartitionClause *partition_clause_;
   quickstep::ParseBlockProperties *block_properties_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/parser/tests/Copy.test
----------------------------------------------------------------------
diff --git a/parser/tests/Copy.test b/parser/tests/Copy.test
index dccaa82..7d1677d 100644
--- a/parser/tests/Copy.test
+++ b/parser/tests/Copy.test
@@ -24,34 +24,131 @@ COPY test FROM 'test.txt' WITH ()
 
 COPY test FROM 'test.txt'
 --
-CopyFromStatement[relation_name=test,source_file=test.txt]
+CopyStatement[direction=FROM,file=test.txt,relation_name=test]
 ==
 
 COPY test FROM 'test.txt' WITH (DELIMITER 'd', ESCAPE_STRINGS FALSE)
 --
-CopyFromStatement[relation_name=test,source_file=test.txt]
-+-params=CopyFromParams[delimiter=d,escape_string=false]
+CopyStatement[direction=FROM,file=test.txt,relation_name=test]
++-params=
+  +-KeyStringValue[key=DELIMITER]
+  | +-value=String[value=d]
+  +-KeyBoolValue[key=ESCAPE_STRINGS,value=false]
 ==
 
 COPY test FROM 'test.txt' WITH (DELIMITER '123', ESCAPE_STRINGS FALSE)
 --
-CopyFromStatement[relation_name=test,source_file=test.txt]
-+-params=CopyFromParams[delimiter=123,escape_string=false]
+CopyStatement[direction=FROM,file=test.txt,relation_name=test]
++-params=
+  +-KeyStringValue[key=DELIMITER]
+  | +-value=String[value=123]
+  +-KeyBoolValue[key=ESCAPE_STRINGS,value=false]
 ==
 
 COPY test FROM 'test.txt' WITH (DELIMITER e'\t')
 --
-CopyFromStatement[relation_name=test,source_file=test.txt]
-+-params=CopyFromParams[delimiter=	,escape_string=true]
+CopyStatement[direction=FROM,file=test.txt,relation_name=test]
++-params=
+  +-KeyStringValue[key=DELIMITER]
+    +-value=String[value=	]
 ==
 
 COPY test FROM 'test.txt' WITH (ESCAPE_STRINGS FALSE, DELIMITER 'd')
 --
-CopyFromStatement[relation_name=test,source_file=test.txt]
-+-params=CopyFromParams[delimiter=d,escape_string=false]
+CopyStatement[direction=FROM,file=test.txt,relation_name=test]
++-params=
+  +-KeyBoolValue[key=ESCAPE_STRINGS,value=false]
+  +-KeyStringValue[key=DELIMITER]
+    +-value=String[value=d]
 ==
 
 COPY test FROM 'test.txt' WITH (DELIMITER '1', ESCAPE_STRINGS FALSE, DELIMITER '2', ESCAPE_STRINGS TRUE)
 --
-CopyFromStatement[relation_name=test,source_file=test.txt]
-+-params=CopyFromParams[delimiter=2,escape_string=true]
+CopyStatement[direction=FROM,file=test.txt,relation_name=test]
++-params=
+  +-KeyStringValue[key=DELIMITER]
+  | +-value=String[value=1]
+  +-KeyBoolValue[key=ESCAPE_STRINGS,value=false]
+  +-KeyStringValue[key=DELIMITER]
+  | +-value=String[value=2]
+  +-KeyBoolValue[key=ESCAPE_STRINGS,value=true]
+==
+
+COPY test TO 'test.txt';
+--
+CopyStatement[direction=TO,file=@test.txt,relation_name=test]
+==
+
+COPY test TO stdout;
+--
+CopyStatement[direction=TO,file=$stdout,relation_name=test]
+==
+
+COPY test TO stderr;
+--
+CopyStatement[direction=TO,file=$stderr,relation_name=test]
+==
+
+COPY test TO 'stdout';
+--
+CopyStatement[direction=TO,file=@stdout,relation_name=test]
+==
+
+COPY test TO 'test.txt' WITH (FORMAT 'TEXT');
+--
+CopyStatement[direction=TO,file=@test.txt,relation_name=test]
++-params=
+  +-KeyStringValue[key=FORMAT]
+    +-value=String[value=TEXT]
+==
+
+COPY test TO 'test.txt' WITH (FORMAT 'CSV');
+--
+CopyStatement[direction=TO,file=@test.txt,relation_name=test]
++-params=
+  +-KeyStringValue[key=FORMAT]
+    +-value=String[value=CSV]
+==
+
+COPY test TO stdout
+WITH (FORMAT 'CSV', DELIMITER e'\t', HEADER TRUE, QUOTE '$', NULL_STRING 'NULL');
+--
+CopyStatement[direction=TO,file=$stdout,relation_name=test]
++-params=
+  +-KeyStringValue[key=FORMAT]
+  | +-value=String[value=CSV]
+  +-KeyStringValue[key=DELIMITER]
+  | +-value=String[value=	]
+  +-KeyBoolValue[key=HEADER,value=true]
+  +-KeyStringValue[key=QUOTE]
+  | +-value=String[value=$]
+  +-KeyStringValue[key=NULL_STRING]
+    +-value=String[value=NULL]
+==
+
+COPY
+  SELECT SUM(int_col) AS sum_int,
+         AVG(double_col) AS avg_dbl
+  FROM test
+  GROUP BY char_col
+TO 'test.txt' WITH (DELIMITER ',');
+--
+CopyStatement[direction=TO,file=@test.txt]
++-set_operation_query=SetOperation[set_operation_type=Select]
+| +-children=
+|   +-Select
+|     +-select_clause=SelectList
+|     | +-SelectListItem[alias=sum_int]
+|     | | +-FunctionCall[name=SUM]
+|     | |   +-AttributeReference[attribute_name=int_col]
+|     | +-SelectListItem[alias=avg_dbl]
+|     |   +-FunctionCall[name=AVG]
+|     |     +-AttributeReference[attribute_name=double_col]
+|     +-group_by=GroupBy
+|     | +-AttributeReference[attribute_name=char_col]
+|     +-from_clause=
+|       +-TableReference[table=test]
++-params=
+  +-KeyStringValue[key=DELIMITER]
+    +-value=String[value=,]
+==

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index fdf8796..4ea21b2 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -94,6 +94,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_queryoptimizer_expressions_WindowAggregateFunction
                       quickstep_queryoptimizer_physical_Aggregate
                       quickstep_queryoptimizer_physical_CopyFrom
+                      quickstep_queryoptimizer_physical_CopyTo
                       quickstep_queryoptimizer_physical_CreateIndex
                       quickstep_queryoptimizer_physical_CreateTable
                       quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
@@ -140,6 +141,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_relationaloperators_SelectOperator
                       quickstep_relationaloperators_SortMergeRunOperator
                       quickstep_relationaloperators_SortRunGenerationOperator
+                      quickstep_relationaloperators_TableExportOperator
                       quickstep_relationaloperators_TableGeneratorOperator
                       quickstep_relationaloperators_TextScanOperator
                       quickstep_relationaloperators_UnionAllOperator

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index d82a0c7..372d576 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -78,6 +78,7 @@
 #include "query_optimizer/expressions/WindowAggregateFunction.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
 #include "query_optimizer/physical/CopyFrom.hpp"
+#include "query_optimizer/physical/CopyTo.hpp"
 #include "query_optimizer/physical/CreateIndex.hpp"
 #include "query_optimizer/physical/CreateTable.hpp"
 #include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
@@ -124,6 +125,7 @@
 #include "relational_operators/SelectOperator.hpp"
 #include "relational_operators/SortMergeRunOperator.hpp"
 #include "relational_operators/SortRunGenerationOperator.hpp"
+#include "relational_operators/TableExportOperator.hpp"
 #include "relational_operators/TableGeneratorOperator.hpp"
 #include "relational_operators/TextScanOperator.hpp"
 #include "relational_operators/UnionAllOperator.hpp"
@@ -409,6 +411,9 @@ void ExecutionGenerator::generatePlanInternal(
     case P::PhysicalType::kCopyFrom:
       return convertCopyFrom(
           std::static_pointer_cast<const P::CopyFrom>(physical_plan));
+    case P::PhysicalType::kCopyTo:
+      return convertCopyTo(
+          std::static_pointer_cast<const P::CopyTo>(physical_plan));
     case P::PhysicalType::kCreateIndex:
       return convertCreateIndex(
           std::static_pointer_cast<const P::CreateIndex>(physical_plan));
@@ -1223,8 +1228,7 @@ void ExecutionGenerator::convertCopyFrom(
           new TextScanOperator(
               query_handle_->query_id(),
               physical_plan->file_name(),
-              physical_plan->column_delimiter(),
-              physical_plan->escape_strings(),
+              physical_plan->options(),
               *output_relation,
               insert_destination_index));
   insert_destination_proto->set_relational_op_index(scan_operator_index);
@@ -1239,6 +1243,40 @@ void ExecutionGenerator::convertCopyFrom(
                                        false /* is_pipeline_breaker */);
 }
 
+void ExecutionGenerator::convertCopyTo(const P::CopyToPtr &physical_plan) {
+  // CopyTo is converted to a TableExport operator.
+
+  const CatalogRelation *input_relation;
+  bool input_relation_is_stored;
+
+  const P::PhysicalPtr &input = physical_plan->input();
+  P::TableReferencePtr table_reference;
+  const CatalogRelationInfo *input_relation_info = nullptr;
+  if (P::SomeTableReference::MatchesWithConditionalCast(input, &table_reference)) {
+    input_relation = table_reference->relation();
+    input_relation_is_stored = true;
+  } else {
+    input_relation_info = findRelationInfoOutputByPhysical(input);
+    input_relation = input_relation_info->relation;
+    input_relation_is_stored = false;
+  }
+
+  DCHECK(input_relation != nullptr);
+  const QueryPlan::DAGNodeIndex table_export_operator_index =
+      execution_plan_->addRelationalOperator(
+          new TableExportOperator(query_handle_->query_id(),
+                                  *input_relation,
+                                  input_relation_is_stored,
+                                  physical_plan->file_name(),
+                                  physical_plan->options()));
+  if (!input_relation_is_stored) {
+    DCHECK(input_relation_info != nullptr);
+    execution_plan_->addDirectDependency(table_export_operator_index,
+                                         input_relation_info->producer_operator_index,
+                                         false /* is_pipeline_breaker */);
+  }
+}
+
 void ExecutionGenerator::convertCreateIndex(
   const P::CreateIndexPtr &physical_plan) {
   // CreateIndex is converted to a CreateIndex operator.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index 19e75c1..bc9f88b 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -44,6 +44,7 @@
 #include "query_optimizer/expressions/Predicate.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
 #include "query_optimizer/physical/CopyFrom.hpp"
+#include "query_optimizer/physical/CopyTo.hpp"
 #include "query_optimizer/physical/CreateIndex.hpp"
 #include "query_optimizer/physical/CreateTable.hpp"
 #include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
@@ -282,6 +283,13 @@ class ExecutionGenerator {
   void convertCopyFrom(const physical::CopyFromPtr &physical_plan);
 
   /**
+   * @brief Converts a CopyTo to a TableExport operator.
+   *
+   * @param physical_plan The CopyTo to be converted.
+   */
+  void convertCopyTo(const physical::CopyToPtr &physical_plan);
+
+  /**
    * @brief Converts a CreateIndex to a CreateIndex operator.
    *
    * @param physical_plan The CreateIndex to be converted.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/logical/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/CMakeLists.txt b/query_optimizer/logical/CMakeLists.txt
index 4480e0b..37e9735 100644
--- a/query_optimizer/logical/CMakeLists.txt
+++ b/query_optimizer/logical/CMakeLists.txt
@@ -19,6 +19,7 @@
 add_library(quickstep_queryoptimizer_logical_Aggregate Aggregate.cpp Aggregate.hpp)
 add_library(quickstep_queryoptimizer_logical_BinaryJoin BinaryJoin.cpp BinaryJoin.hpp)
 add_library(quickstep_queryoptimizer_logical_CopyFrom CopyFrom.cpp CopyFrom.hpp)
+add_library(quickstep_queryoptimizer_logical_CopyTo CopyTo.cpp CopyTo.hpp)
 add_library(quickstep_queryoptimizer_logical_CreateIndex CreateIndex.cpp CreateIndex.hpp)
 add_library(quickstep_queryoptimizer_logical_CreateTable CreateTable.cpp CreateTable.hpp)
 add_library(quickstep_queryoptimizer_logical_DeleteTuples DeleteTuples.cpp DeleteTuples.hpp)
@@ -74,6 +75,16 @@ target_link_libraries(quickstep_queryoptimizer_logical_CopyFrom
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_logical_LogicalType
+                      quickstep_utility_BulkIoConfiguration
+                      quickstep_utility_Macros
+                      quickstep_utility_StringUtil)
+target_link_libraries(quickstep_queryoptimizer_logical_CopyTo
+                      glog
+                      quickstep_queryoptimizer_OptimizerTree
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_logical_Logical
+                      quickstep_queryoptimizer_logical_LogicalType
+                      quickstep_utility_BulkIoConfiguration
                       quickstep_utility_Macros
                       quickstep_utility_StringUtil)
 target_link_libraries(quickstep_queryoptimizer_logical_CreateIndex
@@ -290,6 +301,7 @@ target_link_libraries(quickstep_queryoptimizer_logical
                       quickstep_queryoptimizer_logical_Aggregate
                       quickstep_queryoptimizer_logical_BinaryJoin
                       quickstep_queryoptimizer_logical_CopyFrom
+                      quickstep_queryoptimizer_logical_CopyTo
                       quickstep_queryoptimizer_logical_CreateIndex
                       quickstep_queryoptimizer_logical_CreateTable
                       quickstep_queryoptimizer_logical_DeleteTuples

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/logical/CopyFrom.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/CopyFrom.cpp b/query_optimizer/logical/CopyFrom.cpp
index b0a1423..a80c701 100644
--- a/query_optimizer/logical/CopyFrom.cpp
+++ b/query_optimizer/logical/CopyFrom.cpp
@@ -44,11 +44,11 @@ void CopyFrom::getFieldStringItems(
   inline_field_values->push_back(file_name_);
 
   inline_field_names->push_back("column_delimiter");
-  inline_field_values->push_back("\"" + EscapeSpecialChars(std::string(1, column_delimiter_)) +
-                                 "\"");
+  inline_field_values->push_back(
+      "\"" + EscapeSpecialChars(std::string(1, options_->getDelimiter())) + "\"");
 
   inline_field_names->push_back("escape_strings");
-  inline_field_values->push_back(escape_strings_ ? "true" : "false");
+  inline_field_values->push_back(options_->escapeStrings() ? "true" : "false");
 }
 
 }  // namespace logical

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/logical/CopyFrom.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/CopyFrom.hpp b/query_optimizer/logical/CopyFrom.hpp
index 7c5907f..6b130a6 100644
--- a/query_optimizer/logical/CopyFrom.hpp
+++ b/query_optimizer/logical/CopyFrom.hpp
@@ -17,8 +17,8 @@
  * under the License.
  **/
 
-#ifndef QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPYFROM_HPP_
-#define QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPYFROM_HPP_
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPY_FROM_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPY_FROM_HPP_
 
 #include <memory>
 #include <string>
@@ -28,6 +28,7 @@
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/logical/Logical.hpp"
 #include "query_optimizer/logical/LogicalType.hpp"
+#include "utility/BulkIoConfiguration.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -66,20 +67,14 @@ class CopyFrom : public Logical {
   const std::string& file_name() const { return file_name_; }
 
   /**
-   * @return The delimiter used in the text file to separate columns.
+   * @return The options for this COPY FROM statement.
    */
-  const char column_delimiter() const { return column_delimiter_; }
-
-  /**
-   * @return Whether to decode escape sequences in the text file.
-   */
-  bool escape_strings() const { return escape_strings_; }
+  const BulkIoConfigurationPtr& options() const { return options_; }
 
   LogicalPtr copyWithNewChildren(
       const std::vector<LogicalPtr> &new_children) const override {
     DCHECK(new_children.empty());
-    return Create(catalog_relation_, file_name_, column_delimiter_,
-                  escape_strings_);
+    return Create(catalog_relation_, file_name_, options_);
   }
 
   std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override {
@@ -95,19 +90,13 @@ class CopyFrom : public Logical {
    *
    * @param catalog_relation The catalog relation to insert the tuples to.
    * @param file_name The name of the file to read the data from.
-   * @param column_delimiter The delimiter used in the text file to separate
-   *                         columns.
-   * @param escape_strings Whether to decode escape sequences in the text file.
+   * @param options The options for this COPY FROM statement.
    * @return An immutable CopyFrom logical node.
    */
   static CopyFromPtr Create(const CatalogRelation *catalog_relation,
                             const std::string &file_name,
-                            const char column_delimiter,
-                            bool escape_strings) {
-    return CopyFromPtr(new CopyFrom(catalog_relation,
-                                    file_name,
-                                    column_delimiter,
-                                    escape_strings));
+                            const BulkIoConfigurationPtr &options) {
+    return CopyFromPtr(new CopyFrom(catalog_relation, file_name, options));
   }
 
  protected:
@@ -122,18 +111,14 @@ class CopyFrom : public Logical {
  private:
   CopyFrom(const CatalogRelation *catalog_relation,
            const std::string &file_name,
-           const char column_delimiter,
-           bool escape_strings)
+           const BulkIoConfigurationPtr &options)
       : catalog_relation_(catalog_relation),
         file_name_(file_name),
-        column_delimiter_(column_delimiter),
-        escape_strings_(escape_strings) {}
+        options_(options) {}
 
   const CatalogRelation *catalog_relation_;
-  std::string file_name_;
-
-  const char column_delimiter_;
-  const bool escape_strings_;
+  const std::string file_name_;
+  const BulkIoConfigurationPtr options_;
 
   DISALLOW_COPY_AND_ASSIGN(CopyFrom);
 };
@@ -144,4 +129,4 @@ class CopyFrom : public Logical {
 }  // namespace optimizer
 }  // namespace quickstep
 
-#endif /* QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPYFROM_HPP_ */
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPY_FROM_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/logical/CopyTo.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/CopyTo.cpp b/query_optimizer/logical/CopyTo.cpp
new file mode 100644
index 0000000..369f732
--- /dev/null
+++ b/query_optimizer/logical/CopyTo.cpp
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/logical/CopyTo.hpp"
+
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "utility/StringUtil.hpp"
+
+namespace quickstep {
+namespace optimizer {
+namespace logical {
+
+void CopyTo::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const {
+  inline_field_names->push_back("file_name");
+  inline_field_values->push_back(file_name_);
+
+  non_container_child_field_names->push_back("input");
+  non_container_child_fields->push_back(input_);
+
+  inline_field_names->push_back("format");
+  inline_field_values->push_back(options_->getFormatName());
+
+  inline_field_names->push_back("column_delimiter");
+  inline_field_values->push_back(
+      "\"" + EscapeSpecialChars(std::string(1, options_->getDelimiter())) + "\"");
+
+  if (options_->escapeStrings()) {
+    inline_field_names->push_back("escape_strings");
+    inline_field_values->push_back("true");
+  }
+
+  if (options_->hasHeader()) {
+    inline_field_names->push_back("header");
+    inline_field_values->push_back("true");
+  }
+
+  if (options_->getQuoteCharacter() != 0) {
+    inline_field_names->push_back("quote");
+    inline_field_values->push_back(std::string(1, options_->getQuoteCharacter()));
+  }
+
+  if (options_->getNullString() != "") {
+    inline_field_names->push_back("null_string");
+    inline_field_values->push_back(options_->getNullString());
+  }
+}
+
+}  // namespace logical
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/logical/CopyTo.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/CopyTo.hpp b/query_optimizer/logical/CopyTo.hpp
new file mode 100644
index 0000000..33060a6
--- /dev/null
+++ b/query_optimizer/logical/CopyTo.hpp
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPY_TO_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPY_TO_HPP_
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/logical/Logical.hpp"
+#include "query_optimizer/logical/LogicalType.hpp"
+#include "utility/BulkIoConfiguration.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+namespace logical {
+
+/** \addtogroup OptimizerLogical
+ *  @{
+ */
+
+class CopyTo;
+typedef std::shared_ptr<const CopyTo> CopyToPtr;
+
+/**
+ * @brief Represents an operation that copies data from a relation to a text file.
+ */
+class CopyTo : public Logical {
+ public:
+  LogicalType getLogicalType() const override {
+    return LogicalType::kCopyTo;
+  }
+
+  std::string getName() const override {
+    return "CopyTo";
+  }
+
+  /**
+   * @return The input relation whose data is to be exported.
+   */
+  const LogicalPtr& input() const {
+    return input_;
+  }
+
+  /**
+   * @return The name of the file to write the data to.
+   */
+  const std::string& file_name() const {
+    return file_name_;
+  }
+
+  /**
+   * @return The options for this COPY TO statement.
+   */
+  const BulkIoConfigurationPtr& options() const {
+    return options_;
+  }
+
+  LogicalPtr copyWithNewChildren(
+      const std::vector<LogicalPtr> &new_children) const override {
+    DCHECK_EQ(1u, new_children.size());
+    return Create(new_children.front(), file_name_, options_);
+  }
+
+  std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override {
+    return {};
+  }
+
+  std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override {
+    return input_->getOutputAttributes();
+  }
+
+  /**
+   * @brief Creates a CopyTo logical node.
+   *
+   * @param input The input relation whose data is to be exported.
+   * @param file_name The name of the file to write the data to.
+   * @param options The options for this COPY TO statement.
+   * @return An immutable CopyTo logical node.
+   */
+  static CopyToPtr Create(const LogicalPtr &input,
+                          const std::string &file_name,
+                          const BulkIoConfigurationPtr &options) {
+    return CopyToPtr(new CopyTo(input, file_name, options));
+  }
+
+ protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const override;
+
+ private:
+  CopyTo(const LogicalPtr &input,
+         const std::string &file_name,
+         const BulkIoConfigurationPtr &options)
+      : input_(input),
+        file_name_(file_name),
+        options_(options) {
+    addChild(input);
+  }
+
+  const LogicalPtr input_;
+  const std::string file_name_;
+  const BulkIoConfigurationPtr options_;
+
+  DISALLOW_COPY_AND_ASSIGN(CopyTo);
+};
+
+/** @} */
+
+}  // namespace logical
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_COPY_TO_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/logical/LogicalType.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/LogicalType.hpp b/query_optimizer/logical/LogicalType.hpp
index 21ffdca..d8b85dd 100644
--- a/query_optimizer/logical/LogicalType.hpp
+++ b/query_optimizer/logical/LogicalType.hpp
@@ -32,8 +32,9 @@ namespace logical {
  * @brief Optimizer logical node types.
  **/
 enum class LogicalType {
-  kAggregate,
+  kAggregate = 0,
   kCopyFrom,
+  kCopyTo,
   kCreateIndex,
   kCreateTable,
   kDeleteTuples,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/physical/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CMakeLists.txt b/query_optimizer/physical/CMakeLists.txt
index e510f6b..a1a72f7 100644
--- a/query_optimizer/physical/CMakeLists.txt
+++ b/query_optimizer/physical/CMakeLists.txt
@@ -19,6 +19,7 @@
 add_library(quickstep_queryoptimizer_physical_Aggregate Aggregate.cpp Aggregate.hpp)
 add_library(quickstep_queryoptimizer_physical_BinaryJoin BinaryJoin.cpp BinaryJoin.hpp)
 add_library(quickstep_queryoptimizer_physical_CopyFrom CopyFrom.cpp CopyFrom.hpp)
+add_library(quickstep_queryoptimizer_physical_CopyTo CopyTo.cpp CopyTo.hpp)
 add_library(quickstep_queryoptimizer_physical_CreateIndex CreateIndex.cpp CreateIndex.hpp)
 add_library(quickstep_queryoptimizer_physical_CreateTable CreateTable.cpp CreateTable.hpp)
 add_library(quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
@@ -81,7 +82,18 @@ target_link_libraries(quickstep_queryoptimizer_physical_CopyFrom
                       quickstep_queryoptimizer_expressions_NamedExpression
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_physical_PhysicalType
-                      quickstep_utility_Macros)
+                      quickstep_utility_BulkIoConfiguration
+                      quickstep_utility_Macros
+                      quickstep_utility_StringUtil)
+target_link_libraries(quickstep_queryoptimizer_physical_CopyTo
+                      glog
+                      quickstep_queryoptimizer_OptimizerTree
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_PhysicalType
+                      quickstep_utility_BulkIoConfiguration
+                      quickstep_utility_Macros
+                      quickstep_utility_StringUtil)
 target_link_libraries(quickstep_queryoptimizer_physical_CreateIndex
                       glog
                       quickstep_queryoptimizer_OptimizerTree
@@ -327,6 +339,7 @@ target_link_libraries(quickstep_queryoptimizer_physical
                       quickstep_queryoptimizer_physical_Aggregate
                       quickstep_queryoptimizer_physical_BinaryJoin
                       quickstep_queryoptimizer_physical_CopyFrom
+                      quickstep_queryoptimizer_physical_CopyTo
                       quickstep_queryoptimizer_physical_CreateIndex
                       quickstep_queryoptimizer_physical_CreateTable
                       quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/physical/CopyFrom.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CopyFrom.cpp b/query_optimizer/physical/CopyFrom.cpp
index 8448d4e..65279fe 100644
--- a/query_optimizer/physical/CopyFrom.cpp
+++ b/query_optimizer/physical/CopyFrom.cpp
@@ -24,6 +24,7 @@
 
 #include "catalog/CatalogRelation.hpp"
 #include "query_optimizer/OptimizerTree.hpp"
+#include "utility/StringUtil.hpp"
 
 namespace quickstep {
 namespace optimizer {
@@ -43,10 +44,11 @@ void CopyFrom::getFieldStringItems(
   inline_field_values->push_back(file_name_);
 
   inline_field_names->push_back("column_delimiter");
-  inline_field_values->push_back(std::string(1, column_delimiter_));
+  inline_field_values->push_back(
+      "\"" + EscapeSpecialChars(std::string(1, options_->getDelimiter())) + "\"");
 
   inline_field_names->push_back("escape_strings");
-  inline_field_values->push_back(escape_strings_ ? "true" : "false");
+  inline_field_values->push_back(options_->escapeStrings() ? "true" : "false");
 }
 
 }  // namespace physical

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/physical/CopyFrom.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CopyFrom.hpp b/query_optimizer/physical/CopyFrom.hpp
index ecbf318..c9bbcdf 100644
--- a/query_optimizer/physical/CopyFrom.hpp
+++ b/query_optimizer/physical/CopyFrom.hpp
@@ -17,8 +17,8 @@
  * under the License.
  **/
 
-#ifndef QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_COPYFROM_HPP_
-#define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_COPYFROM_HPP_
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_COPY_FROM_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_COPY_FROM_HPP_
 
 #include <memory>
 #include <string>
@@ -30,6 +30,7 @@
 #include "query_optimizer/expressions/NamedExpression.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/PhysicalType.hpp"
+#include "utility/BulkIoConfiguration.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -68,22 +69,14 @@ class CopyFrom : public Physical {
   const std::string& file_name() const { return file_name_; }
 
   /**
-   * @return The delimiter used in the text file to separate columns.
+   * @return The options for this COPY FROM statement.
    */
-  const char column_delimiter() const { return column_delimiter_; }
-
-  /**
-   * @return Whether to decode escape sequences in the text file.
-   */
-  bool escape_strings() const { return escape_strings_; }
+  const BulkIoConfigurationPtr& options() const { return options_; }
 
   PhysicalPtr copyWithNewChildren(
       const std::vector<PhysicalPtr> &new_children) const override {
     DCHECK(new_children.empty());
-    return Create(catalog_relation_,
-                  file_name_,
-                  column_delimiter_,
-                  escape_strings_);
+    return Create(catalog_relation_, file_name_, options_);
   }
 
   std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override {
@@ -112,12 +105,8 @@ class CopyFrom : public Physical {
    */
   static CopyFromPtr Create(const CatalogRelation *catalog_relation,
                             const std::string &file_name,
-                            const char &column_delimiter,
-                            bool escape_strings) {
-    return CopyFromPtr(new CopyFrom(catalog_relation,
-                                    file_name,
-                                    column_delimiter,
-                                    escape_strings));
+                            const BulkIoConfigurationPtr &options) {
+    return CopyFromPtr(new CopyFrom(catalog_relation, file_name, options));
   }
 
  protected:
@@ -132,18 +121,14 @@ class CopyFrom : public Physical {
  private:
   CopyFrom(const CatalogRelation *catalog_relation,
            const std::string &file_name,
-           const char column_delimiter,
-           bool escape_strings)
+           const BulkIoConfigurationPtr &options)
       : catalog_relation_(catalog_relation),
         file_name_(file_name),
-        column_delimiter_(column_delimiter),
-        escape_strings_(escape_strings) {}
+        options_(options) {}
 
   const CatalogRelation *catalog_relation_;
-  std::string file_name_;
-
-  const char column_delimiter_;
-  const bool escape_strings_;
+  const std::string file_name_;
+  const BulkIoConfigurationPtr options_;
 
   DISALLOW_COPY_AND_ASSIGN(CopyFrom);
 };
@@ -154,4 +139,4 @@ class CopyFrom : public Physical {
 }  // namespace optimizer
 }  // namespace quickstep
 
-#endif /* QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_COPYFROM_HPP_ */
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_COPY_FROM_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/physical/CopyTo.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CopyTo.cpp b/query_optimizer/physical/CopyTo.cpp
new file mode 100644
index 0000000..9cd954e
--- /dev/null
+++ b/query_optimizer/physical/CopyTo.cpp
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/physical/CopyTo.hpp"
+
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "utility/StringUtil.hpp"
+
+namespace quickstep {
+namespace optimizer {
+namespace physical {
+
+void CopyTo::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const {
+  inline_field_names->push_back("file_name");
+  inline_field_values->push_back(file_name_);
+
+  non_container_child_field_names->push_back("input");
+  non_container_child_fields->push_back(input_);
+
+  inline_field_names->push_back("format");
+  inline_field_values->push_back(options_->getFormatName());
+
+  inline_field_names->push_back("column_delimiter");
+  inline_field_values->push_back(
+      "\"" + EscapeSpecialChars(std::string(1, options_->getDelimiter())) + "\"");
+
+  if (options_->escapeStrings()) {
+    inline_field_names->push_back("escape_strings");
+    inline_field_values->push_back("true");
+  }
+
+  if (options_->hasHeader()) {
+    inline_field_names->push_back("header");
+    inline_field_values->push_back("true");
+  }
+
+  if (options_->getQuoteCharacter() != 0) {
+    inline_field_names->push_back("quote");
+    inline_field_values->push_back(std::string(1, options_->getQuoteCharacter()));
+  }
+
+  if (options_->getNullString() != "") {
+    inline_field_names->push_back("null_string");
+    inline_field_values->push_back(options_->getNullString());
+  }
+}
+
+}  // namespace physical
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/physical/CopyTo.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CopyTo.hpp b/query_optimizer/physical/CopyTo.hpp
new file mode 100644
index 0000000..004d4f8
--- /dev/null
+++ b/query_optimizer/physical/CopyTo.hpp
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_COPY_TO_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_COPY_TO_HPP_
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "utility/BulkIoConfiguration.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+namespace physical {
+
+/** \addtogroup OptimizerPhysical
+ *  @{
+ */
+
+class CopyTo;
+typedef std::shared_ptr<const CopyTo> CopyToPtr;
+
+/**
+ * @brief Represents an operation that copies data from a relation to a text file.
+ */
+class CopyTo : public Physical {
+ public:
+  PhysicalType getPhysicalType() const override {
+    return PhysicalType::kCopyTo;
+  }
+
+  std::string getName() const override {
+    return "CopyTo";
+  }
+
+  /**
+   * @return The input relation whose data is to be exported.
+   */
+  const PhysicalPtr& input() const {
+    return input_;
+  }
+
+  /**
+   * @return The name of the file to write the data to.
+   */
+  const std::string& file_name() const {
+    return file_name_;
+  }
+
+  /**
+   * @return The options for this COPY TO statement.
+   */
+  const BulkIoConfigurationPtr& options() const {
+    return options_;
+  }
+
+  PhysicalPtr copyWithNewChildren(
+      const std::vector<PhysicalPtr> &new_children) const override {
+    DCHECK_EQ(1u, new_children.size());
+    return Create(new_children.front(), file_name_, options_);
+  }
+
+  std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override {
+    return {};
+  }
+
+  std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override {
+    return input_->getOutputAttributes();
+  }
+
+  bool maybeCopyWithPrunedExpressions(
+      const expressions::UnorderedNamedExpressionSet &referenced_expressions,
+      PhysicalPtr *output) const override {
+    return false;
+  }
+
+  /**
+   * @brief Creates a CopyTo physical node.
+   *
+   * @param input The input relation whose data is to be exported.
+   * @param file_name The name of the file to write the data to.
+   * @param options The options for this COPY TO statement.
+   * @return An immutable CopyTo physical node.
+   */
+  static CopyToPtr Create(const PhysicalPtr &input,
+                          const std::string &file_name,
+                          const BulkIoConfigurationPtr &options) {
+    return CopyToPtr(new CopyTo(input, file_name, options));
+  }
+
+ protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const override;
+
+ private:
+  CopyTo(const PhysicalPtr &input,
+         const std::string &file_name,
+         const BulkIoConfigurationPtr &options)
+      : input_(input),
+        file_name_(file_name),
+        options_(options) {
+    addChild(input);
+  }
+
+  const PhysicalPtr input_;
+  const std::string file_name_;
+  const BulkIoConfigurationPtr options_;
+
+  DISALLOW_COPY_AND_ASSIGN(CopyTo);
+};
+
+/** @} */
+
+}  // namespace physical
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_COPY_TO_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/physical/PhysicalType.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/PhysicalType.hpp b/query_optimizer/physical/PhysicalType.hpp
index 47db7ec..0a965af 100644
--- a/query_optimizer/physical/PhysicalType.hpp
+++ b/query_optimizer/physical/PhysicalType.hpp
@@ -32,8 +32,9 @@ namespace physical {
  * @brief Optimizer physical node types.
  **/
 enum class PhysicalType {
-  kAggregate,
+  kAggregate = 0,
   kCopyFrom,
+  kCopyTo,
   kCreateIndex,
   kCreateTable,
   kCrossReferenceCoalesceAggregate,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/resolver/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/CMakeLists.txt b/query_optimizer/resolver/CMakeLists.txt
index 4e364a6..6feb1e8 100644
--- a/query_optimizer/resolver/CMakeLists.txt
+++ b/query_optimizer/resolver/CMakeLists.txt
@@ -97,6 +97,7 @@ target_link_libraries(quickstep_queryoptimizer_resolver_Resolver
                       quickstep_queryoptimizer_expressions_WindowAggregateFunction
                       quickstep_queryoptimizer_logical_Aggregate
                       quickstep_queryoptimizer_logical_CopyFrom
+                      quickstep_queryoptimizer_logical_CopyTo
                       quickstep_queryoptimizer_logical_CreateIndex
                       quickstep_queryoptimizer_logical_CreateTable
                       quickstep_queryoptimizer_logical_DeleteTuples
@@ -131,6 +132,7 @@ target_link_libraries(quickstep_queryoptimizer_resolver_Resolver
                       quickstep_types_operations_unaryoperations_DateExtractOperation
                       quickstep_types_operations_unaryoperations_SubstringOperation
                       quickstep_types_operations_unaryoperations_UnaryOperation
+                      quickstep_utility_BulkIoConfiguration
                       quickstep_utility_Macros
                       quickstep_utility_PtrList
                       quickstep_utility_PtrVector

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp
index 0f65255..935e235 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -93,6 +93,7 @@
 #include "query_optimizer/expressions/WindowAggregateFunction.hpp"
 #include "query_optimizer/logical/Aggregate.hpp"
 #include "query_optimizer/logical/CopyFrom.hpp"
+#include "query_optimizer/logical/CopyTo.hpp"
 #include "query_optimizer/logical/CreateIndex.hpp"
 #include "query_optimizer/logical/CreateTable.hpp"
 #include "query_optimizer/logical/DeleteTuples.hpp"
@@ -126,6 +127,7 @@
 #include "types/operations/unary_operations/DateExtractOperation.hpp"
 #include "types/operations/unary_operations/SubstringOperation.hpp"
 #include "types/operations/unary_operations/UnaryOperation.hpp"
+#include "utility/BulkIoConfiguration.hpp"
 #include "utility/PtrList.hpp"
 #include "utility/PtrVector.hpp"
 #include "utility/SqlError.hpp"
@@ -143,6 +145,45 @@ namespace E = ::quickstep::optimizer::expressions;
 namespace L = ::quickstep::optimizer::logical;
 namespace S = ::quickstep::serialization;
 
+namespace {
+
+attribute_id GetAttributeIdFromName(
+    const PtrList<ParseAttributeDefinition> &attribute_definition_list,
+    const std::string &attribute_name) {
+  const std::string lower_attribute_name = ToLower(attribute_name);
+
+  attribute_id attr_id = 0;
+  for (const ParseAttributeDefinition &attribute_definition : attribute_definition_list) {
+    if (lower_attribute_name == ToLower(attribute_definition.name()->value())) {
+      return attr_id;
+    }
+
+    ++attr_id;
+  }
+
+  return kInvalidAttributeID;
+}
+
+const ParseString* GetKeyValueString(const ParseKeyValue &key_value) {
+  if (key_value.getKeyValueType() != ParseKeyValue::kStringString) {
+      THROW_SQL_ERROR_AT(&key_value)
+          << "Invalid value type for " << key_value.key()->value()
+          << ", expected a string.";
+  }
+  return static_cast<const ParseKeyStringValue&>(key_value).value();
+}
+
+bool GetKeyValueBool(const ParseKeyValue &key_value) {
+  if (key_value.getKeyValueType() != ParseKeyValue::kStringBool) {
+      THROW_SQL_ERROR_AT(&key_value)
+          << "Invalid value for " << key_value.key()->value()
+          << ", expected true or false.";
+  }
+  return static_cast<const ParseKeyBoolValue&>(key_value).value();
+}
+
+}  // namespace
+
 struct Resolver::ExpressionResolutionInfo {
   /**
    * @brief Constructs an ExpressionResolutionInfo that disallows aggregate
@@ -316,11 +357,25 @@ struct Resolver::SelectListInfo {
 
 L::LogicalPtr Resolver::resolve(const ParseStatement &parse_query) {
   switch (parse_query.getStatementType()) {
-    case ParseStatement::kCopyFrom:
-      context_->set_is_catalog_changed();
-      logical_plan_ = resolveCopyFrom(
-          static_cast<const ParseStatementCopyFrom&>(parse_query));
+    case ParseStatement::kCopy: {
+      const ParseStatementCopy &copy_statemnt =
+          static_cast<const ParseStatementCopy&>(parse_query);
+      if (copy_statemnt.getCopyDirection() == ParseStatementCopy::kFrom) {
+        context_->set_is_catalog_changed();
+        logical_plan_ = resolveCopyFrom(copy_statemnt);
+      } else {
+        DCHECK(copy_statemnt.getCopyDirection() == ParseStatementCopy::kTo);
+        if (copy_statemnt.with_clause() != nullptr) {
+          resolveWithClause(*copy_statemnt.with_clause());
+        }
+        logical_plan_ = resolveCopyTo(copy_statemnt);
+
+        if (copy_statemnt.with_clause() != nullptr) {
+          reportIfWithClauseUnused(*copy_statemnt.with_clause());
+        }
+      }
       break;
+    }
     case ParseStatement::kCreateTable:
       context_->set_is_catalog_changed();
       logical_plan_ = resolveCreateTable(
@@ -359,16 +414,7 @@ L::LogicalPtr Resolver::resolve(const ParseStatement &parse_query) {
         logical_plan_ = resolveInsertSelection(insert_selection_statement);
 
         if (insert_selection_statement.with_clause() != nullptr) {
-          // Report an error if there is a WITH query that is not actually used.
-          if (!with_queries_info_.unreferenced_query_indexes.empty()) {
-            int unreferenced_with_query_index = *with_queries_info_.unreferenced_query_indexes.begin();
-            const ParseSubqueryTableReference &unreferenced_with_query =
-                (*insert_selection_statement.with_clause())[unreferenced_with_query_index];
-            THROW_SQL_ERROR_AT(&unreferenced_with_query)
-                << "WITH query "
-                << unreferenced_with_query.table_reference_signature()->table_alias()->value()
-                << " is defined but not used";
-          }
+          reportIfWithClauseUnused(*insert_selection_statement.with_clause());
         }
       }
       break;
@@ -385,16 +431,7 @@ L::LogicalPtr Resolver::resolve(const ParseStatement &parse_query) {
                               nullptr /* type_hints */,
                               nullptr /* parent_resolver */);
       if (set_operation_statement.with_clause() != nullptr) {
-        // Report an error if there is a WITH query that is not actually used.
-        if (!with_queries_info_.unreferenced_query_indexes.empty()) {
-          int unreferenced_with_query_index = *with_queries_info_.unreferenced_query_indexes.begin();
-          const ParseSubqueryTableReference &unreferenced_with_query =
-              (*set_operation_statement.with_clause())[unreferenced_with_query_index];
-          THROW_SQL_ERROR_AT(&unreferenced_with_query)
-              << "WITH query "
-              << unreferenced_with_query.table_reference_signature()->table_alias()->value()
-              << " is defined but not used";
-        }
+        reportIfWithClauseUnused(*set_operation_statement.with_clause());
       }
       break;
     }
@@ -418,27 +455,156 @@ L::LogicalPtr Resolver::resolve(const ParseStatement &parse_query) {
 }
 
 L::LogicalPtr Resolver::resolveCopyFrom(
-    const ParseStatementCopyFrom &copy_from_statement) {
-  // Default parameters.
-  std::string column_delimiter_ = "\t";
-  bool escape_strings_ = true;
+    const ParseStatementCopy &copy_from_statement) {
+  DCHECK(copy_from_statement.getCopyDirection() == ParseStatementCopy::kFrom);
+  const PtrList<ParseKeyValue> *params = copy_from_statement.params();
 
-  const ParseCopyFromParams *params = copy_from_statement.params();
+  BulkIoFormat file_format = BulkIoFormat::kText;
   if (params != nullptr) {
-    if (params->delimiter != nullptr) {
-      column_delimiter_ = params->delimiter->value();
-      if (column_delimiter_.size() != 1) {
-        THROW_SQL_ERROR_AT(params->delimiter)
-            << "DELIMITER is not a single character";
+    for (const ParseKeyValue &param : *params) {
+      const std::string &key = ToLower(param.key()->value());
+      if (key == "format") {
+        const ParseString *parse_format = GetKeyValueString(param);
+        const std::string format = ToLower(parse_format->value());
+        // TODO(jianqiao): Support other bulk load formats such as CSV.
+        if (format != "text") {
+          THROW_SQL_ERROR_AT(parse_format) << "Unsupported file format: " << format;
+        }
+        // Update file_format when other formats get supported.
+        break;
+      }
+    }
+  }
+
+  std::unique_ptr<BulkIoConfiguration> options =
+      std::make_unique<BulkIoConfiguration>(file_format);
+  if (params != nullptr) {
+    for (const ParseKeyValue &param : *params) {
+      const std::string key = ToLower(param.key()->value());
+      if (key == "delimiter") {
+        const ParseString *parse_delimiter = GetKeyValueString(param);
+        const std::string &delimiter = parse_delimiter->value();
+        if (delimiter.size() != 1u) {
+          THROW_SQL_ERROR_AT(parse_delimiter)
+              << "DELIMITER is not a single character";
+        }
+        options->setDelimiter(delimiter.front());
+      } else if (key == "escape_strings") {
+        options->setEscapeStrings(GetKeyValueBool(param));
+      } else if (key != "format") {
+        THROW_SQL_ERROR_AT(&param) << "Unsupported copy option: " << key;
       }
     }
-    escape_strings_ = params->escape_strings;
   }
 
   return L::CopyFrom::Create(resolveRelationName(copy_from_statement.relation_name()),
-                             copy_from_statement.source_filename()->value(),
-                             column_delimiter_[0],
-                             escape_strings_);
+                             copy_from_statement.file_name()->value(),
+                             BulkIoConfigurationPtr(options.release()));
+}
+
+L::LogicalPtr Resolver::resolveCopyTo(
+    const ParseStatementCopy &copy_to_statement) {
+  DCHECK(copy_to_statement.getCopyDirection() == ParseStatementCopy::kTo);
+  const PtrList<ParseKeyValue> *params = copy_to_statement.params();
+
+  // Check if copy format is explicitly specified.
+  BulkIoFormat file_format = BulkIoFormat::kText;
+  bool format_specified = false;
+  if (params != nullptr) {
+    for (const ParseKeyValue &param : *params) {
+      const std::string &key = ToLower(param.key()->value());
+      if (key == "format") {
+        const ParseString *parse_format = GetKeyValueString(param);
+        const std::string format = ToLower(parse_format->value());
+        if (format == "csv") {
+          file_format = BulkIoFormat::kCsv;
+        } else if (format == "text") {
+          file_format = BulkIoFormat::kText;
+        } else {
+          THROW_SQL_ERROR_AT(parse_format) << "Unsupported file format: " << format;
+        }
+        format_specified = true;
+        break;
+      }
+    }
+  }
+
+  const std::string &file_name = copy_to_statement.file_name()->value();
+  if (file_name.length() <= 1u) {
+    THROW_SQL_ERROR_AT(copy_to_statement.file_name())
+        << "File name can not be empty";
+  }
+
+  // Infer copy format from file name extension.
+  if (!format_specified) {
+    if (file_name.length() > 4u) {
+      if (ToLower(file_name.substr(file_name.length() - 4)) == ".csv") {
+        file_format = BulkIoFormat::kCsv;
+      }
+    }
+  }
+
+  // Resolve the copy options.
+  std::unique_ptr<BulkIoConfiguration> options =
+      std::make_unique<BulkIoConfiguration>(file_format);
+  if (params != nullptr) {
+    for (const ParseKeyValue &param : *params) {
+      const std::string key = ToLower(param.key()->value());
+      if (key == "delimiter") {
+        const ParseString *parse_delimiter = GetKeyValueString(param);
+        const std::string &delimiter = parse_delimiter->value();
+        if (delimiter.size() != 1u) {
+          THROW_SQL_ERROR_AT(parse_delimiter)
+              << "DELIMITER is not a single character";
+        }
+        options->setDelimiter(delimiter.front());
+      } else if (file_format == BulkIoFormat::kText && key == "escape_strings") {
+        options->setEscapeStrings(GetKeyValueBool(param));
+      } else if (file_format == BulkIoFormat::kCsv && key == "header") {
+        options->setHeader(GetKeyValueBool(param));
+      } else if (file_format == BulkIoFormat::kCsv && key == "quote") {
+        const ParseString *parse_quote = GetKeyValueString(param);
+        const std::string &quote = parse_quote->value();
+        if (quote.size() != 1u) {
+          THROW_SQL_ERROR_AT(parse_quote)
+              << "QUOTE is not a single character";
+        }
+        options->setQuoteCharacter(quote.front());
+      } else if (key == "null_string") {
+        const ParseString *parse_null_string = GetKeyValueString(param);
+        options->setNullString(parse_null_string->value());
+      } else if (key != "format") {
+        THROW_SQL_ERROR_AT(&param)
+            << "Unsupported copy option \"" << key
+            << "\" for file format " << options->getFormatName();
+      }
+    }
+  }
+
+  // Resolve the source relation.
+  L::LogicalPtr input;
+  if (copy_to_statement.set_operation_query() != nullptr) {
+    input = resolveSetOperation(*copy_to_statement.set_operation_query(),
+                                "" /* set_operation_name */,
+                                nullptr /* type_hints */,
+                                nullptr /* parent_resolver */);
+  } else {
+    const ParseString *relation_name = copy_to_statement.relation_name();
+    DCHECK(relation_name != nullptr);
+    ParseSimpleTableReference table_reference(
+        relation_name->line_number(),
+        relation_name->column_number(),
+        new ParseString(relation_name->line_number(),
+                        relation_name->column_number(),
+                        relation_name->value()),
+        nullptr /* sample */);
+    NameResolver name_resolver;
+    input = resolveTableReference(table_reference, &name_resolver);
+  }
+
+  return L::CopyTo::Create(input,
+                           copy_to_statement.file_name()->value(),
+                           BulkIoConfigurationPtr(options.release()));
 }
 
 L::LogicalPtr Resolver::resolveCreateTable(
@@ -491,26 +657,6 @@ L::LogicalPtr Resolver::resolveCreateTable(
   return L::CreateTable::Create(relation_name, attributes, block_properties, partition_scheme_header_proto);
 }
 
-namespace {
-
-attribute_id GetAttributeIdFromName(const PtrList<ParseAttributeDefinition> &attribute_definition_list,
-                                    const std::string &attribute_name) {
-  const std::string lower_attribute_name = ToLower(attribute_name);
-
-  attribute_id attr_id = 0;
-  for (const ParseAttributeDefinition &attribute_definition : attribute_definition_list) {
-    if (lower_attribute_name == ToLower(attribute_definition.name()->value())) {
-      return attr_id;
-    }
-
-    ++attr_id;
-  }
-
-  return kInvalidAttributeID;
-}
-
-}  // namespace
-
 StorageBlockLayoutDescription* Resolver::resolveBlockProperties(
     const ParseStatementCreateTable &create_table_statement) {
   const ParseBlockProperties *block_properties
@@ -1595,6 +1741,20 @@ void Resolver::appendProjectIfNeedPrecomputationAfterAggregation(
   }
 }
 
+void Resolver::reportIfWithClauseUnused(
+    const PtrVector<ParseSubqueryTableReference> &with_list) const {
+  if (!with_queries_info_.unreferenced_query_indexes.empty()) {
+    const int unreferenced_with_query_index =
+        *with_queries_info_.unreferenced_query_indexes.begin();
+    const ParseSubqueryTableReference &unreferenced_with_query =
+        with_list[unreferenced_with_query_index];
+    THROW_SQL_ERROR_AT(&unreferenced_with_query)
+        << "WITH query "
+        << unreferenced_with_query.table_reference_signature()->table_alias()->value()
+        << " is defined but not used";
+  }
+}
+
 void Resolver::validateSelectExpressionsForAggregation(
     const ParseSelectionClause &parse_selection,
     const std::vector<E::NamedExpressionPtr> &select_list_expressions,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/resolver/Resolver.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.hpp b/query_optimizer/resolver/Resolver.hpp
index 1ae565a..1784782 100644
--- a/query_optimizer/resolver/Resolver.hpp
+++ b/query_optimizer/resolver/Resolver.hpp
@@ -55,7 +55,7 @@ class ParseSimpleCaseExpression;
 class ParseSimpleTableReference;
 class ParseSubqueryTableReference;
 class ParseStatement;
-class ParseStatementCopyFrom;
+class ParseStatementCopy;
 class ParseStatementCreateTable;
 class ParseStatementCreateIndex;
 class ParseStatementDelete;
@@ -283,7 +283,16 @@ class Resolver {
    * @return A logical plan for the COPY FROM query.
    */
   logical::LogicalPtr resolveCopyFrom(
-      const ParseStatementCopyFrom &copy_from_statement);
+      const ParseStatementCopy &copy_from_statement);
+
+  /**
+   * @brief Resolves a COPY TO query and returns a logical plan.
+   *
+   * @param copy_to_statement The COPY TO parse tree.
+   * @return A logical plan for the COPY TO query.
+   */
+  logical::LogicalPtr resolveCopyTo(
+      const ParseStatementCopy &copy_to_statement);
 
   /**
    * @brief Resolves a UPDATE query and returns a logical plan.
@@ -621,6 +630,14 @@ class Resolver {
   static std::string GenerateOrderingAttributeAlias(int index);
 
   /**
+   * @brief Reports an error if there is a WITH query that is not actually used.
+   *
+   * @param with_list The list of subqueries in WITH clause.
+   */
+  void reportIfWithClauseUnused(
+      const PtrVector<ParseSubqueryTableReference> &with_list) const;
+
+  /**
    * @brief Validates each SELECT-list expression to ensure that it does not
    *        reference a named expression with an ID not in \p valid_expr_id_set.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/strategy/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/strategy/CMakeLists.txt b/query_optimizer/strategy/CMakeLists.txt
index e011126..20a4eb4 100644
--- a/query_optimizer/strategy/CMakeLists.txt
+++ b/query_optimizer/strategy/CMakeLists.txt
@@ -76,6 +76,7 @@ target_link_libraries(quickstep_queryoptimizer_strategy_OneToOne
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_expressions_ExpressionUtil
                       quickstep_queryoptimizer_logical_CopyFrom
+                      quickstep_queryoptimizer_logical_CopyTo
                       quickstep_queryoptimizer_logical_CreateIndex
                       quickstep_queryoptimizer_logical_CreateTable
                       quickstep_queryoptimizer_logical_DeleteTuples
@@ -95,6 +96,7 @@ target_link_libraries(quickstep_queryoptimizer_strategy_OneToOne
                       quickstep_queryoptimizer_logical_WindowAggregate
                       quickstep_queryoptimizer_physical_Aggregate
                       quickstep_queryoptimizer_physical_CopyFrom
+                      quickstep_queryoptimizer_physical_CopyTo
                       quickstep_queryoptimizer_physical_CreateIndex
                       quickstep_queryoptimizer_physical_CreateTable
                       quickstep_queryoptimizer_physical_DeleteTuples

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/strategy/OneToOne.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/strategy/OneToOne.cpp b/query_optimizer/strategy/OneToOne.cpp
index af4e150..3cfe013 100644
--- a/query_optimizer/strategy/OneToOne.cpp
+++ b/query_optimizer/strategy/OneToOne.cpp
@@ -27,6 +27,7 @@
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/ExpressionUtil.hpp"
 #include "query_optimizer/logical/CopyFrom.hpp"
+#include "query_optimizer/logical/CopyTo.hpp"
 #include "query_optimizer/logical/CreateIndex.hpp"
 #include "query_optimizer/logical/CreateTable.hpp"
 #include "query_optimizer/logical/DeleteTuples.hpp"
@@ -45,6 +46,7 @@
 #include "query_optimizer/logical/WindowAggregate.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
 #include "query_optimizer/physical/CopyFrom.hpp"
+#include "query_optimizer/physical/CopyTo.hpp"
 #include "query_optimizer/physical/CreateIndex.hpp"
 #include "query_optimizer/physical/CreateTable.hpp"
 #include "query_optimizer/physical/DeleteTuples.hpp"
@@ -104,19 +106,28 @@ bool OneToOne::generatePlan(const L::LogicalPtr &logical_input,
     case L::LogicalType::kCopyFrom: {
       const L::CopyFromPtr copy_from =
           std::static_pointer_cast<const L::CopyFrom>(logical_input);
-      *physical_output = P::CopyFrom::Create(
-          copy_from->catalog_relation(), copy_from->file_name(),
-          copy_from->column_delimiter(), copy_from->escape_strings());
+      *physical_output = P::CopyFrom::Create(copy_from->catalog_relation(),
+                                             copy_from->file_name(),
+                                             copy_from->options());
+      return true;
+    }
+    case L::LogicalType::kCopyTo: {
+      const L::CopyToPtr copy_to =
+          std::static_pointer_cast<const L::CopyTo>(logical_input);
+      *physical_output = P::CopyTo::Create(
+          physical_mapper_->createOrGetPhysicalFromLogical(copy_to->input()),
+          copy_to->file_name(),
+          copy_to->options());
       return true;
     }
     case L::LogicalType::kCreateIndex: {
       const L::CreateIndexPtr create_index =
           std::static_pointer_cast<const L::CreateIndex>(logical_input);
-      *physical_output = P::CreateIndex::Create(physical_mapper_->createOrGetPhysicalFromLogical(
-                                                                    create_index->input()),
-                                                create_index->index_name(),
-                                                create_index->index_attributes(),
-                                                create_index->index_description());
+      *physical_output = P::CreateIndex::Create(
+          physical_mapper_->createOrGetPhysicalFromLogical(create_index->input()),
+          create_index->index_name(),
+          create_index->index_attributes(),
+          create_index->index_description());
       return true;
     }
     case L::LogicalType::kCreateTable: {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
index ee9bee7..050ef0d 100644
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp
@@ -62,6 +62,9 @@ void ExecutionGeneratorTestRunner::runTestCase(
   MemStream output_stream;
   sql_parser_.feedNextBuffer(new std::string(input));
 
+  // Redirect stderr to output_stream.
+  stderr = output_stream.file();
+
   while (true) {
     ParseResult result = sql_parser_.getNextStatement();
     if (result.condition != ParseResult::kSuccess) {
@@ -71,8 +74,6 @@ void ExecutionGeneratorTestRunner::runTestCase(
       break;
     } else {
       const ParseStatement &parse_statement = *result.parsed_statement;
-      std::printf("%s\n", parse_statement.toString().c_str());
-
       const CatalogRelation *query_result_relation = nullptr;
       try {
         OptimizerContext optimizer_context;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/tests/execution_generator/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/CMakeLists.txt b/query_optimizer/tests/execution_generator/CMakeLists.txt
index 09a7647..ebcb0b6 100644
--- a/query_optimizer/tests/execution_generator/CMakeLists.txt
+++ b/query_optimizer/tests/execution_generator/CMakeLists.txt
@@ -20,6 +20,11 @@ add_test(quickstep_queryoptimizer_tests_executiongenerator_commonsubexpression
          "${CMAKE_CURRENT_SOURCE_DIR}/CommonSubexpression.test"
          "${CMAKE_CURRENT_BINARY_DIR}/CommonSubexpression.test"
          "${CMAKE_CURRENT_BINARY_DIR}/CommonSubexpression/")
+add_test(quickstep_queryoptimizer_tests_executiongenerator_copy
+         "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+         "${CMAKE_CURRENT_SOURCE_DIR}/Copy.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/Copy.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/Copy/")
 add_test(quickstep_queryoptimizer_tests_executiongenerator_create
          "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
          "${CMAKE_CURRENT_SOURCE_DIR}/Create.test"
@@ -158,6 +163,7 @@ endif(ENABLE_DISTRIBUTED)
 # Create the folders where the unit tests will store their data blocks for the
 # duration of their test.
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/CommonSubexpression)
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Copy)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Create)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Delete)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Distinct)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/tests/execution_generator/Copy.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/Copy.test b/query_optimizer/tests/execution_generator/Copy.test
new file mode 100644
index 0000000..fa892c1
--- /dev/null
+++ b/query_optimizer/tests/execution_generator/Copy.test
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+CREATE TABLE source (
+  int_col INT NULL,
+  date_col DATE NULL,
+  char_col CHAR(16),
+  varchar_col VARCHAR(16)
+);
+
+INSERT INTO source VALUES(1, '2000-01-01', 'aa', 'aaa');
+INSERT INTO source VALUES(2, '2000-02-02', 'bb', 'bbb');
+INSERT INTO source VALUES(3, '2000-03-03', 'cc', 'ccc');
+INSERT INTO source VALUES(4, '2000-04-04', 'aa', 'ddd');
+INSERT INTO source VALUES(5, '2000-05-05', 'bb', 'eee');
+INSERT INTO source VALUES(6, '2000-06-06', 'cc', 'fff');
+
+COPY source TO stderr WITH (DELIMITER '|');
+--
+1|2000-01-01|aa|aaa
+2|2000-02-02|bb|bbb
+3|2000-03-03|cc|ccc
+4|2000-04-04|aa|ddd
+5|2000-05-05|bb|eee
+6|2000-06-06|cc|fff
+==
+
+COPY
+  SELECT char_col, SUM(int_col)
+  FROM source
+  GROUP BY char_col
+TO stderr;
+--
+aa	5
+bb	7
+cc	9
+==
+
+COPY
+  SELECT * FROM (
+    SELECT -int_col * 1000, 'Negative' FROM source
+    UNION ALL
+    SELECT int_col * 1000, 'Positive' FROM source
+  ) AS t(VALUE, SIGN)
+  ORDER BY value
+TO stderr WITH (FORMAT 'CSV', DELIMITER e'\t');
+--
+VALUE	SIGN
+-6000	Negative
+-5000	Negative
+-4000	Negative
+-3000	Negative
+-2000	Negative
+-1000	Negative
+1000	Positive
+2000	Positive
+3000	Positive
+4000	Positive
+5000	Positive
+6000	Positive
+==
+
+# WITH clause.
+WITH r(x, y) AS (
+  SELECT i, i + 1
+  FROM generate_series(0, 9) AS g(i)
+)
+COPY
+  SELECT x * y AS value FROM r ORDER BY value
+TO stderr;
+--
+0
+2
+6
+12
+20
+30
+42
+56
+72
+90
+==
+
+
+# Test handling of NULL values and special characters.
+DELETE FROM source;
+INSERT INTO source VALUES(1, '2000-01-01', 'abc', 'def');
+INSERT INTO source VALUES(2, '2000-02-02', e'a\ta', '|,|');
+INSERT INTO source VALUES(NULL, NULL, e'b\nb', '"""');
+
+COPY source TO stderr;
+--
+1	2000-01-01	abc	def
+2	2000-02-02	a\ta	|,|
+\N	\N	b\nb	"""
+==
+
+COPY source TO stderr WITH (FORMAT 'CSV', HEADER FALSE);
+--
+1,2000-01-01,abc,def
+2,2000-02-02,a	a,"|,|"
+,,"b
+b",""""""""
+==
+
+COPY source TO stderr WITH (FORMAT 'CSV', DELIMITER '|', NULL_STRING '.na', HEADER TRUE);
+--
+int_col|date_col|char_col|varchar_col
+1|2000-01-01|abc|def
+2|2000-02-02|a	a|"|,|"
+.na|.na|"b
+b"|""""""""
+==

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/tests/physical_generator/Copy.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/physical_generator/Copy.test b/query_optimizer/tests/physical_generator/Copy.test
index 2f66415..3d76373 100644
--- a/query_optimizer/tests/physical_generator/Copy.test
+++ b/query_optimizer/tests/physical_generator/Copy.test
@@ -26,7 +26,7 @@ TopLevelPlan
   +-[]
 [Physical Plan]
 TopLevelPlan
-+-plan=CopyFrom[relation=Test,file_name=test.txt,column_delimiter=	,
++-plan=CopyFrom[relation=Test,file_name=test.txt,column_delimiter="\t",
 | escape_strings=true]
 +-output_attributes=
   +-[]
@@ -42,7 +42,7 @@ TopLevelPlan
   +-[]
 [Physical Plan]
 TopLevelPlan
-+-plan=CopyFrom[relation=Test,file_name=test.txt,column_delimiter=d,
++-plan=CopyFrom[relation=Test,file_name=test.txt,column_delimiter="d",
 | escape_strings=false]
 +-output_attributes=
   +-[]

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29816511/query_optimizer/tests/resolver/Copy.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/resolver/Copy.test b/query_optimizer/tests/resolver/Copy.test
index c2ae91a..d68e18f 100644
--- a/query_optimizer/tests/resolver/Copy.test
+++ b/query_optimizer/tests/resolver/Copy.test
@@ -16,7 +16,7 @@
 # under the License.
 
 [default initial_logical_plan]
-copy test from 'test.txt'
+COPY test FROM 'test.txt'
 --
 TopLevelPlan
 +-plan=CopyFrom[relation=Test,file_name=test.txt,column_delimiter="\t",
@@ -25,14 +25,14 @@ TopLevelPlan
   +-[]
 ==
 
-copy tESt from 'test.txt' with (delimiter '123')
+COPY tESt FROM 'test.txt' WITH (delimiter '123')
 --
 ERROR: DELIMITER is not a single character (1 : 43)
-copy tESt from 'test.txt' with (delimiter '123')
+COPY tESt FROM 'test.txt' WITH (delimiter '123')
                                           ^
 ==
 
-copy tESt from 'test.txt' with (delimiter 'd', escape_strings false)
+COPY tESt FROM 'test.txt' WITH (delimiter 'd', escape_strings false)
 --
 TopLevelPlan
 +-plan=CopyFrom[relation=Test,file_name=test.txt,column_delimiter="d",
@@ -41,8 +41,160 @@ TopLevelPlan
   +-[]
 ==
 
-copy undefined_table from 'test.txt'
+COPY test TO 'test.txt';
+--
+TopLevelPlan
++-plan=CopyTo[file_name=@test.txt,format=TEXT,column_delimiter="\t",
+| escape_strings=true,null_string=\N]
+| +-input=TableReference[relation_name=Test,relation_alias=test]
+|   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|   +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+|   +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+|   +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+|   +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
++-output_attributes=
+  +-[]
+==
+
+COPY test TO stdout WITH (FORMAT 'CSV');
+--
+TopLevelPlan
++-plan=CopyTo[file_name=$stdout,format=CSV,column_delimiter=",",header=true,
+| quote="]
+| +-input=TableReference[relation_name=Test,relation_alias=test]
+|   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|   +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+|   +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+|   +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+|   +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
++-output_attributes=
+  +-[]
+==
+
+COPY
+  SELECT SUM(int_col) AS sum_int,
+         AVG(double_col) AS avg_dbl
+  FROM test
+  GROUP BY char_col
+  UNION ALL
+  SELECT 1, 2.0
+  FROM generate_series(1, 1)
+TO 'test.txt' WITH (DELIMITER ',');
+--
+TopLevelPlan
++-plan=CopyTo[file_name=@test.txt,format=TEXT,column_delimiter=",",
+| escape_strings=true,null_string=\N]
+| +-input=UnionAll[set_operation_type=UnionAll]
+|   +-operands=
+|   | +-Project
+|   | | +-input=Project
+|   | | | +-input=Aggregate
+|   | | | | +-input=TableReference[relation_name=Test,relation_alias=test]
+|   | | | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   | | | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|   | | | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+|   | | | | | +-AttributeReference[id=3,name=double_col,relation=test,
+|   | | | | | | type=Double NULL]
+|   | | | | | +-AttributeReference[id=4,name=char_col,relation=test,
+|   | | | | | | type=Char(20)]
+|   | | | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+|   | | | | |   type=VarChar(20) NULL]
+|   | | | | +-grouping_expressions=
+|   | | | | | +-AttributeReference[id=4,name=char_col,relation=test,
+|   | | | | |   type=Char(20)]
+|   | | | | +-aggregate_expressions=
+|   | | | |   +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,
+|   | | | |   | type=Long NULL]
+|   | | | |   | +-AggregateFunction[function=SUM]
+|   | | | |   |   +-AttributeReference[id=0,name=int_col,relation=test,
+|   | | | |   |     type=Int NULL]
+|   | | | |   +-Alias[id=7,name=,alias=$aggregate1,relation=$aggregate,
+|   | | | |     type=Double NULL]
+|   | | | |     +-AggregateFunction[function=AVG]
+|   | | | |       +-AttributeReference[id=3,name=double_col,relation=test,
+|   | | | |         type=Double NULL]
+|   | | | +-project_list=
+|   | | |   +-Alias[id=6,name=sum_int,relation=,type=Long NULL]
+|   | | |   | +-AttributeReference[id=6,name=,alias=$aggregate0,
+|   | | |   |   relation=$aggregate,type=Long NULL]
+|   | | |   +-Alias[id=7,name=avg_dbl,relation=,type=Double NULL]
+|   | | |     +-AttributeReference[id=7,name=,alias=$aggregate1,
+|   | | |       relation=$aggregate,type=Double NULL]
+|   | | +-project_list=
+|   | |   +-AttributeReference[id=6,name=sum_int,relation=,type=Long NULL]
+|   | |   +-AttributeReference[id=7,name=avg_dbl,relation=,type=Double NULL]
+|   | +-Project
+|   |   +-input=Project
+|   |   | +-input=TableGenerator[function_name=generate_series]
+|   |   | | +-AttributeReference[id=8,name=generate_series,
+|   |   | |   relation=generate_series,type=Int]
+|   |   | +-project_list=
+|   |   |   +-Alias[id=9,name=,alias=1,relation=,type=Int]
+|   |   |   | +-Literal[value=1,type=Int]
+|   |   |   +-Alias[id=10,name=,alias=2.0,relation=,type=Double]
+|   |   |     +-Literal[value=2,type=Double]
+|   |   +-project_list=
+|   |     +-Alias[id=11,name=,alias=1,relation=,type=Long NULL]
+|   |     | +-Cast[target_type=Long NULL]
+|   |     |   +-operand=AttributeReference[id=9,name=,alias=1,relation=,type=Int]
+|   |     +-Alias[id=12,name=,alias=2.0,relation=,type=Double NULL]
+|   |       +-Cast[target_type=Double NULL]
+|   |         +-operand=AttributeReference[id=10,name=,alias=2.0,relation=,
+|   |           type=Double]
+|   +-project_attributes=
+|     +-AttributeReference[id=13,name=sum_int,relation=,type=Long NULL]
+|     +-AttributeReference[id=14,name=avg_dbl,relation=,type=Double NULL]
++-output_attributes=
+  +-[]
+==
+
+COPY undefined_table FROM 'test.txt'
 --
 ERROR: Unrecognized relation undefined_table (1 : 6)
-copy undefined_table from 'test.txt...
+COPY undefined_table FROM 'test.txt...
      ^
+==
+
+COPY test FROM 'test.txt' WITH (FORMAT 'CSV')
+--
+ERROR: Unsupported file format: csv (1 : 40)
+COPY test FROM 'test.txt' WITH (FORMAT 'CSV')
+                                       ^
+==
+
+COPY test FROM 'test.txt' WITH (XXX 'YY');
+--
+ERROR: Unsupported copy option: xxx (1 : 33)
+COPY test FROM 'test.txt' WITH (XXX 'YY');
+                                ^
+==
+
+COPY test TO 'test.txt' WITH (QUOTE '$');
+--
+ERROR: Unsupported copy option "quote" for file format TEXT (1 : 31)
+COPY test TO 'test.txt' WITH (QUOTE '$');
+                              ^
+==
+
+COPY test TO 'test.txt' WITH (FORMAT 'CSV', ESCAPE_STRINGS TRUE);
+--
+ERROR: Unsupported copy option "escape_strings" for file format CSV (1 : 45)
+... test TO 'test.txt' WITH (FORMAT 'CSV', ESCAPE_STRINGS TRUE);
+                                           ^
+==
+
+COPY test TO 'test.txt' WITH (FORMAT CSV, QUOTE '$$');
+--
+ERROR: QUOTE is not a single character (1 : 49)
+...test TO 'test.txt' WITH (FORMAT CSV, QUOTE '$$');
+                                              ^
+==
+
+COPY test TO 'test.txt' WITH (FORMAT 'TEXT', QUOTE '"');
+--
+ERROR: Unsupported copy option "quote" for file format TEXT (1 : 46)
+...test TO 'test.txt' WITH (FORMAT 'TEXT', QUOTE '"');
+                                           ^
+==