You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/11/27 02:02:02 UTC

[12/14] impala git commit: IMPALA-7869: break up parquet-column-readers.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
deleted file mode 100644
index d559b8e..0000000
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ /dev/null
@@ -1,654 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_EXEC_HDFS_PARQUET_SCANNER_H
-#define IMPALA_EXEC_HDFS_PARQUET_SCANNER_H
-
-#include "codegen/impala-ir.h"
-#include "exec/hdfs-scanner.h"
-#include "exec/parquet-common.h"
-#include "exec/parquet-scratch-tuple-batch.h"
-#include "exec/parquet-metadata-utils.h"
-#include "runtime/scoped-buffer.h"
-#include "util/runtime-profile-counters.h"
-
-namespace impala {
-
-class CollectionValueBuilder;
-struct HdfsFileDesc;
-
-/// Internal schema representation and resolution.
-struct SchemaNode;
-
-/// Class that implements Parquet definition and repetition level decoding.
-class ParquetLevelDecoder;
-
-/// Per column reader.
-class ParquetColumnReader;
-class CollectionColumnReader;
-class BaseScalarColumnReader;
-template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
-class ScalarColumnReader;
-class BoolColumnReader;
-
-/// This scanner parses Parquet files located in HDFS, and writes the content as tuples in
-/// the Impala in-memory representation of data, e.g.  (tuples, rows, row batches).
-/// For the file format spec, see: github.com/apache/parquet-format
-///
-/// ---- Schema resolution ----
-/// Additional columns are allowed at the end in either the table or file schema (i.e.,
-/// extra columns at the end of the schema or extra fields at the end of a struct).  If
-/// there are extra columns in the file schema, they are simply ignored. If there are
-/// extra in the table schema, we return NULLs for those columns (if they're
-/// materialized).
-///
-/// ---- Disk IO ----
-/// Parquet (and other columnar formats) use scan ranges differently than other formats.
-/// Each materialized column maps to a single ScanRange per row group.  For streaming
-/// reads, all the columns need to be read in parallel. This is done by issuing one
-/// ScanRange (in IssueInitialRanges()) for the file footer per split.
-/// ProcessSplit() is called once for each original split and determines the row groups
-/// whose midpoints fall within that split. We use the mid-point to determine whether a
-/// row group should be processed because if the row group size is less than or equal to
-/// the split size, the mid point guarantees that we have at least 50% of the row group in
-/// the current split. ProcessSplit() then computes the column ranges for these row groups
-/// and submits them to the IoMgr for immediate scheduling (so they don't surface in
-/// RequestContext::GetNextUnstartedRange()). Scheduling them immediately also guarantees
-/// they are all read at once.
-///
-/// Like the other scanners, each parquet scanner object is one to one with a
-/// ScannerContext. Unlike the other scanners though, the context will have multiple
-/// streams, one for each column. Row groups are processed one at a time this way.
-///
-/// ---- Nested types ----
-/// This scanner supports reading and materializing nested data. For a good overview of
-/// how nested data is encoded, see blog.twitter.com/2013/dremel-made-simple-with-parquet.
-/// For how SQL nested schemas are translated to parquet schemas, see
-/// github.com/apache/parquet-format/blob/master/LogicalTypes.md#nested-types.
-///
-/// Examples:
-/// For these examples, we will use the following table definition:
-/// tbl:
-///   id                bigint
-///   array_col         array<array<int>>
-///
-/// The table definition could correspond to the following parquet schema (note the
-/// required 'id' field. If written by Impala, all non-repeated fields would be optional,
-/// but we can read repeated fields as well):
-///
-/// required group record         d=0 r=0
-///   req int64 id                d=0 r=0
-///   opt group array_col (LIST)  d=1 r=0
-///     repeated group list       d=2 r=1
-///       opt group item (LIST)   d=3 r=1
-///         repeated group list   d=4 r=2
-///           opt int32 item      d=5 r=2
-///
-/// Each element in the schema has been annotated with the maximum def level and maximum
-/// rep level corresponding to that element. Note that the repeated elements add a def
-/// level. This distinguishes between 0 items (empty list) and more than 0 items
-/// (non-empty list). The containing optional LIST element for each array determines
-/// whether the whole list is null or non-null. Maps work the same way, the only
-/// differences being that the repeated group contains two child fields ("key" and "value"
-/// instead of "item"), and the outer element is annotated with MAP instead of LIST.
-///
-/// Only scalar schema elements are materialized in parquet files; internal nested
-/// elements can be reconstructed using the def and rep levels. To illustrate this, here
-/// is data containing every valid definition and repetition for the materialized int
-/// 'item' element. The data records appear on the left, the encoded definition levels,
-/// repetition levels, and values for the 'item' field appear on the right (the encoded
-/// 'id' field is not shown).
-///
-/// record                       d r v
-/// ------------------------------------
-/// {id: 0, array_col: NULL}     0 0 -
-/// {id: 1, array_col: []}       1 0 -
-/// {id: 2, array_col: [NULL]}   2 0 -
-/// {id: 3, array_col: [[]]}     3 0 -
-/// {id: 4, array_col: [[NULL]]} 4 0 -
-/// {id: 5, array_col: [[1,      5 0 1
-///                      NULL],  4 2 -
-///                     [2]]}    5 1 2
-/// {id: 6, array_col: [[3]]}    5 0 3
-///
-/// * Example query 1:
-///     select id, inner.item from tbl t, t.array_col outer, outer.item inner
-///   Results from above sample data:
-///     4,NULL
-///     5,1
-///     5,NULL
-///     5,2
-///     6,3
-///
-/// Descriptors:
-///  Tuple(id=0 tuple_path=[] slots=[
-///    Slot(id=0 type=ARRAY col_path=[1] collection_item_tuple_id=1),
-///    Slot(id=2 type=BIGINT col_path=[0])])
-///  Tuple(id=1 tuple_path=[1] slots=[
-///    Slot(id=1 type=ARRAY col_path=[1,0] collection_item_tuple_id=2)])
-///  Tuple(id=2 tuple_path=[1, 0] slots=[
-///    Slot(id=3 type=INT col_path=[1,0,0])])
-///
-///   The parquet scanner will materialize the following in-memory row batch:
-///          RowBatch
-///        +==========+
-///        | 0 | NULL |
-///        |----------|
-///        | 1 | NULL |      outer
-///        |----------|     +======+
-///        | 2 |  --------->| NULL |
-///        |   |      |     +======+
-///        |----------|
-///        |   |      |     +======+
-///        | 3 |  --------->| NULL |
-///        |   |      |     +======+
-///        |   |      |                  inner
-///        |----------|     +======+    +======+
-///        | 4 |  --------->|  -------->| NULL |
-///        |   |      |     +======+    +======+
-///        |   |      |
-///        |----------|     +======+    +======+
-///        | 5 |  --------->|  -------->|  1   |
-///        |   |      |     |      |    +------+
-///        |   |      |     |      |    | NULL |
-///        |   |      |     +------+    +======+
-///        |   |      |     |      |
-///        |   |      |     |      |    +======+
-///        |   |      |     |  -------->|  2   |
-///        |   |      |     +======+    +======+
-///        |   |      |
-///        |----------|     +======+    +======+
-///        | 6 |  --------->|  -------->|  3   |
-///        +==========+     +======+    +======+
-///
-///   The top-level row batch contains two slots, one containing the int64_t 'id' slot and
-///   the other containing the CollectionValue 'array_col' slot. The CollectionValues in
-///   turn contain pointers to their item tuple data. Each item tuple contains a single
-///   ArrayColumn slot ('array_col.item'). The inner CollectionValues' item tuples contain
-///   a single int 'item' slot.
-///
-///   Note that the scanner materializes a NULL CollectionValue for empty collections.
-///   This is technically a bug (it should materialize a CollectionValue with num_tuples =
-///   0), but we don't distinguish between these two cases yet.
-///   TODO: fix this (IMPALA-2272)
-///
-///   The column readers that materialize this structure form a tree analogous to the
-///   materialized output:
-///     CollectionColumnReader slot_id=0 node="repeated group list (d=2 r=1)"
-///       CollectionColumnReader slot_id=1 node="repeated group list (d=4 r=2)"
-///         ScalarColumnReader<int32_t> slot_id=3 node="opt int32 item (d=5 r=2)"
-///     ScalarColumnReader<int64_t> slot_id=2 node="req int64 id (d=0 r=0)"
-///
-///   Note that the collection column readers reference the "repeated group item" schema
-///   element of the serialized array, not the outer "opt group" element. This is what
-///   causes the bug described above, it should consider both elements.
-///
-/// * Example query 2:
-///     select inner.item from tbl.array_col.item inner;
-///   Results from the above sample data:
-///     NULL
-///     1
-///     NULL
-///     2
-///     3
-///
-///   Descriptors:
-///    Tuple(id=0 tuple_path=[1, 0] slots=[
-///      Slot(id=0 type=INT col_path=[1,0,0])])
-///
-///   In-memory row batch:
-///     +======+
-///     | NULL |
-///     |------|
-///     |  1   |
-///     |------|
-///     | NULL |
-///     |------|
-///     |  2   |
-///     |------|
-///     |  3   |
-///     +======+
-///
-///   Column readers:
-///     ScalarColumnReader<int32_t> slot_id=0 node="opt int32 item (d=5 r=2)"
-///
-///   In this example, the scanner doesn't materialize a nested in-memory result, since
-///   only the single int 'item' slot is materialized. However, it still needs to read the
-///   nested data as shown above. An important point to notice is that a tuple is not
-///   materialized for every rep and def level pair read -- there are 9 of these pairs
-///   total in the sample data above, but only 5 tuples are materialized. This is because
-///   in this case, nothing should be materialized for NULL or empty arrays, since we're
-///   only materializing the innermost item. If a def level is read that doesn't
-///   correspond to any item value (NULL or otherwise), the scanner advances to the next
-///   rep and def levels without materializing a tuple.
-///
-/// * Example query 3:
-///     select id, inner.item from tbl t, t.array_col.item inner
-///   Results from the above sample data (same as example 1):
-///     4,NULL
-///     5,1
-///     5,NULL
-///     5,2
-///     6,3
-///
-///   Descriptors:
-///    Tuple(id=0 tuple_path=[] slots=[
-///      Slot(id=0 type=ARRAY col_path=[2]),
-///      Slot(id=1 type=BIGINT col_path=[0])])
-///    Tuple(id=1 tuple_path=[2, 0] slots=[
-///      Slot(id=2 type=INT col_path=[2,0,0])])
-///
-///   In-memory row batch:
-///       RowBatch
-///     +==========+
-///     | 0 | NULL |
-///     |----------|
-///     | 1 | NULL |
-///     |----------|      inner
-///     | 2 |  --------->+======+
-///     |   |      |     +======+
-///     |----------|
-///     |   |      |
-///     | 3 |  --------->+======+
-///     |   |      |     +======+
-///     |   |      |
-///     |----------|     +======+
-///     | 4 |  --------->| NULL |
-///     |   |      |     +======+
-///     |   |      |
-///     |----------|     +======+
-///     | 5 |  --------->|  1   |
-///     |   |      |     +------+
-///     |   |      |     | NULL |
-///     |   |      |     +------+
-///     |   |      |     |  2   |
-///     |   |      |     +======+
-///     |   |      |
-///     |----------|     +======+
-///     | 6 |  --------->|  3   |
-///     +==========+     +======+
-///
-///   Column readers:
-///     CollectionColumnReader slot_id=0 node="repeated group list (d=2 r=1)"
-///       ScalarColumnReader<int32_t> slot_id=2 node="opt int32 item (d=5 r=2)"
-///     ScalarColumnReader<int32_t> id=1 node="req int64 id (d=0 r=0)"
-///
-///   In this example, the scanner materializes a "flattened" version of inner, rather
-///   than the full 3-level structure. Note that the collection reader references the
-///   outer array, which determines how long each materialized array is, and the items in
-///   the array are from the inner array.
-///
-/// ---- Slot materialization ----
-/// Top-level tuples:
-/// The slots of top-level tuples are populated in a column-wise fashion. Each column
-/// reader materializes a batch of values into a temporary 'scratch batch'. Once a
-/// scratch batch has been fully populated, runtime filters and conjuncts are evaluated
-/// against the scratch tuples, and the surviving tuples are set in the output batch that
-/// is handed to the scan node. The ownership of tuple memory is transferred from a
-/// scratch batch to an output row batch once all tuples in the scratch batch have either
-/// been filtered or returned as part of an output batch.
-///
-/// Collection items:
-/// Unlike the top-level tuples, the item tuples of CollectionValues are populated in
-/// a row-wise fashion because doing it column-wise has the following challenges.
-/// First, we would need to allocate a scratch batch for every collection-typed slot
-/// which could consume a lot of memory. Then we'd need a similar mechanism to transfer
-/// tuples that survive conjuncts to an output collection. However, CollectionValues lack
-/// the row indirection that row batches have, so we would need to either deep copy the
-/// surviving tuples, or come up with a different mechanism altogether.
-/// TODO: Populating CollectionValues in a column-wise fashion seems different enough
-/// and less critical for most of our users today to defer this task until later.
-///
-/// ---- Runtime filters ----
-/// HdfsParquetScanner is able to apply runtime filters that arrive before or during
-/// scanning. Filters are applied at both the row group (see AssembleRows()) and row (see
-/// ReadRow()) scope. If all filter predicates do not pass, the row or row group will be
-/// excluded from output. Only partition-column filters are applied at AssembleRows(). The
-/// FilterContexts for these filters are cloned from the parent scan node and attached to
-/// the ScannerContext.
-class HdfsParquetScanner : public HdfsScanner {
- public:
-  HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
-  virtual ~HdfsParquetScanner() {}
-
-  /// Issue just the footer range for each file.  We'll then parse the footer and pick
-  /// out the columns we want. 'files' must not be empty.
-  static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
-                                   const std::vector<HdfsFileDesc*>& files)
-                                   WARN_UNUSED_RESULT;
-
-  virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;
-  virtual Status ProcessSplit() WARN_UNUSED_RESULT;
-  virtual void Close(RowBatch* row_batch);
-
-  /// Codegen ProcessScratchBatch(). Stores the resulting function in
-  /// 'process_scratch_batch_fn' if codegen was successful or NULL otherwise.
-  static Status Codegen(HdfsScanNodeBase* node,
-      const std::vector<ScalarExpr*>& conjuncts,
-      llvm::Function** process_scratch_batch_fn)
-      WARN_UNUSED_RESULT;
-
-  /// Initializes a ParquetTimestampDecoder depending on writer, timezone, and the schema
-  /// of the column.
-  ParquetTimestampDecoder CreateTimestampDecoder(const parquet::SchemaElement& element);
-
-  /// The rep and def levels are set to this value to indicate the end of a row group.
-  static const int16_t ROW_GROUP_END = numeric_limits<int16_t>::min();
-  /// Indicates an invalid definition or repetition level.
-  static const int16_t INVALID_LEVEL = -1;
-  /// Indicates an invalid position value.
-  static const int16_t INVALID_POS = -1;
-
-  /// Class name in LLVM IR.
-  static const char* LLVM_CLASS_NAME;
-
- private:
-  friend class ParquetColumnReader;
-  friend class CollectionColumnReader;
-  friend class BaseScalarColumnReader;
-  template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
-  friend class ScalarColumnReader;
-  friend class BoolColumnReader;
-  friend class HdfsParquetScannerTest;
-
-  /// Index of the current row group being processed. Initialized to -1 which indicates
-  /// that we have not started processing the first row group yet (GetNext() has not yet
-  /// been called).
-  int32_t row_group_idx_;
-
-  /// Counts the number of rows processed for the current row group.
-  int64_t row_group_rows_read_;
-
-  /// Indicates whether we should advance to the next row group in the next GetNext().
-  /// Starts out as true to move to the very first row group.
-  bool advance_row_group_;
-
-  boost::scoped_ptr<ParquetSchemaResolver> schema_resolver_;
-
-  /// Tuple to hold values when reading parquet::Statistics. Owned by perm_pool_.
-  Tuple* min_max_tuple_;
-
-  /// Clone of Min/max statistics conjunct evaluators. Has the same life time as
-  /// the scanner. Stored in 'obj_pool_'.
-  vector<ScalarExprEvaluator*> min_max_conjunct_evals_;
-
-  /// Pool used for allocating caches of definition/repetition levels and tuples for
-  /// dictionary filtering. The definition/repetition levels are populated by the
-  /// level readers. The pool is freed in Close().
-  boost::scoped_ptr<MemPool> perm_pool_;
-
-  /// Number of scratch batches processed so far.
-  int64_t row_batches_produced_;
-
-  /// Column reader for each top-level materialized slot in the output tuple.
-  std::vector<ParquetColumnReader*> column_readers_;
-
-  /// Column readers will write slot values into this scratch batch for
-  /// top-level tuples. See AssembleRows().
-  boost::scoped_ptr<ScratchTupleBatch> scratch_batch_;
-
-  /// File metadata thrift object
-  parquet::FileMetaData file_metadata_;
-
-  /// Version of the application that wrote this file.
-  ParquetFileVersion file_version_;
-
-  /// Scan range for the metadata.
-  const io::ScanRange* metadata_range_;
-
-  /// Pool to copy dictionary page buffer into. This pool is shared across all the
-  /// pages in a column chunk.
-  boost::scoped_ptr<MemPool> dictionary_pool_;
-
-  /// Column readers that are eligible for dictionary filtering.
-  /// These are pointers to elements of column_readers_. Materialized columns that are
-  /// dictionary encoded correspond to scalar columns that are either top-level columns
-  /// or nested within a collection. CollectionColumnReaders are not eligible for
-  /// dictionary filtering so are not included.
-  std::vector<BaseScalarColumnReader*> dict_filterable_readers_;
-
-  /// Column readers that are not eligible for dictionary filtering.
-  /// These are pointers to elements of column_readers_. The readers are either top-level
-  /// or nested within a collection.
-  std::vector<BaseScalarColumnReader*> non_dict_filterable_readers_;
-
-  /// Flattened list of all scalar column readers in column_readers_.
-  std::vector<BaseScalarColumnReader*> scalar_readers_;
-
-  /// Flattened collection column readers that point to readers in column_readers_.
-  std::vector<CollectionColumnReader*> collection_readers_;
-
-  /// Memory used to store the tuples used for dictionary filtering. Tuples owned by
-  /// perm_pool_.
-  std::unordered_map<const TupleDescriptor*, Tuple*> dict_filter_tuple_map_;
-
-  /// Timer for materializing rows.  This ignores time getting the next buffer.
-  ScopedTimer<MonotonicStopWatch> assemble_rows_timer_;
-
-  /// Average and min/max time spent processing the footer by each split.
-  RuntimeProfile::SummaryStatsCounter* process_footer_timer_stats_;
-
-  /// Number of columns that need to be read.
-  RuntimeProfile::Counter* num_cols_counter_;
-
-  /// Number of row groups that are skipped because of Parquet row group statistics.
-  RuntimeProfile::Counter* num_stats_filtered_row_groups_counter_;
-
-  /// Number of row groups that need to be read.
-  RuntimeProfile::Counter* num_row_groups_counter_;
-
-  /// Number of scanners that end up doing no reads because their splits don't overlap
-  /// with the midpoint of any row-group in the file.
-  RuntimeProfile::Counter* num_scanners_with_no_reads_counter_;
-
-  /// Number of row groups skipped due to dictionary filter
-  RuntimeProfile::Counter* num_dict_filtered_row_groups_counter_;
-
-  /// Number of collection items read in current row batch. It is a scanner-local counter
-  /// used to reduce the frequency of updating HdfsScanNode counter. It is updated by the
-  /// callees of AssembleRows() and is merged into the HdfsScanNode counter at the end of
-  /// AssembleRows() and then is reset to 0.
-  int64_t coll_items_read_counter_;
-
-  typedef int (*ProcessScratchBatchFn)(HdfsParquetScanner*, RowBatch*);
-  /// The codegen'd version of ProcessScratchBatch() if available, NULL otherwise.
-  ProcessScratchBatchFn codegend_process_scratch_batch_fn_;
-
-  const char* filename() const { return metadata_range_->file(); }
-
-  virtual Status GetNextInternal(RowBatch* row_batch) WARN_UNUSED_RESULT;
-
-  /// Evaluates the min/max predicates of the 'scan_node_' using the parquet::Statistics
-  /// of 'row_group'. 'file_metadata' is used to determine the ordering that was used to
-  /// compute the statistics. Sets 'skip_row_group' to true if the row group can be
-  /// skipped, 'false' otherwise.
-  Status EvaluateStatsConjuncts(const parquet::FileMetaData& file_metadata,
-      const parquet::RowGroup& row_group, bool* skip_row_group) WARN_UNUSED_RESULT;
-
-  /// Advances 'row_group_idx_' to the next non-empty row group and initializes
-  /// the column readers to scan it. Recoverable errors are logged to the runtime
-  /// state. Only returns a non-OK status if a non-recoverable error is encountered
-  /// (or abort_on_error is true). If OK is returned, 'parse_status_' is guaranteed
-  /// to be OK as well.
-  Status NextRowGroup() WARN_UNUSED_RESULT;
-
-  /// Reads data using 'column_readers' to materialize top-level tuples into 'row_batch'.
-  /// Returns a non-OK status if a non-recoverable error was encountered and execution
-  /// of this query should be terminated immediately.
-  /// May set *skip_row_group to indicate that the current row group should be skipped,
-  /// e.g., due to a parse error, but execution should continue.
-  Status AssembleRows(const std::vector<ParquetColumnReader*>& column_readers,
-      RowBatch* row_batch, bool* skip_row_group) WARN_UNUSED_RESULT;
-
-  /// Commit num_rows to the given row batch.
-  /// Returns OK if the query is not cancelled and hasn't exceeded any mem limits.
-  /// Scanner can call this with 0 rows to flush any pending resources (attached pools
-  /// and io buffers) to minimize memory consumption.
-  Status CommitRows(RowBatch* dst_batch, int num_rows) WARN_UNUSED_RESULT;
-
-  /// Evaluates runtime filters and conjuncts (if any) against the tuples in
-  /// 'scratch_batch_', and adds the surviving tuples to the given batch.
-  /// Transfers the ownership of tuple memory to the target batch when the
-  /// scratch batch is exhausted.
-  /// Returns the number of rows that should be committed to the given batch.
-  int TransferScratchTuples(RowBatch* dst_batch);
-
-  /// Processes a single row batch for TransferScratchTuples, looping over scratch_batch_
-  /// until it is exhausted or the output is full. Called for the case when there are
-  /// materialized tuples. This is a separate function so it can be codegened.
-  int ProcessScratchBatch(RowBatch* dst_batch);
-
-  /// Reads data using 'column_readers' to materialize the tuples of a CollectionValue
-  /// allocated from 'coll_value_builder'. Increases 'coll_items_read_counter_' by the
-  /// number of items in this collection and descendant collections.
-  ///
-  /// 'new_collection_rep_level' indicates when the end of the collection has been
-  /// reached, namely when current_rep_level <= new_collection_rep_level.
-  ///
-  /// Returns true when the end of the current collection is reached, and execution can
-  /// be safely resumed.
-  /// Returns false if execution should be aborted due to:
-  /// - parse_error_ is set
-  /// - query is cancelled
-  /// - scan node limit was reached
-  /// When false is returned the column_readers are left in an undefined state and
-  /// execution should be aborted immediately by the caller.
-  bool AssembleCollection(const std::vector<ParquetColumnReader*>& column_readers,
-      int new_collection_rep_level, CollectionValueBuilder* coll_value_builder);
-
-  /// Function used by AssembleCollection() to materialize a single collection item
-  /// into 'tuple'. Returns false if execution should be aborted for some reason,
-  /// otherwise returns true.
-  /// If 'materialize_tuple' is false, only advances the column readers' levels,
-  /// and does not read any data values.
-  inline bool ReadCollectionItem(const std::vector<ParquetColumnReader*>& column_readers,
-      bool materialize_tuple, MemPool* pool, Tuple* tuple) const;
-
-  /// Process the file footer and parse file_metadata_.  This should be called with the
-  /// last FOOTER_SIZE bytes in context_.
-  Status ProcessFooter() WARN_UNUSED_RESULT;
-
-  /// Populates 'column_readers' for the slots in 'tuple_desc', including creating child
-  /// readers for any collections. Schema resolution is handled in this function as
-  /// well. Fills in the appropriate template tuple slot with NULL for any materialized
-  /// fields missing in the file.
-  Status CreateColumnReaders(const TupleDescriptor& tuple_desc,
-      const ParquetSchemaResolver& schema_resolver,
-      std::vector<ParquetColumnReader*>* column_readers) WARN_UNUSED_RESULT;
-
-  /// Returns the total number of scalar column readers in 'column_readers', including
-  /// the children of collection readers.
-  int CountScalarColumns(const std::vector<ParquetColumnReader*>& column_readers);
-
-  /// Creates a column reader that reads one value for each item in the table or
-  /// collection element corresponding to 'parent_path'. 'parent_path' should point to
-  /// either a collection element or the root schema (i.e. empty path). The returned
-  /// reader has no slot desc associated with it, meaning only NextLevels() and not
-  /// ReadValue() can be called on it.
-  ///
-  /// This is used for counting item values, rather than materializing any values. For
-  /// example, in a count(*) over a collection, there are no values to materialize, but we
-  /// still need to iterate over every item in the collection to count them.
-  Status CreateCountingReader(const SchemaPath& parent_path,
-      const ParquetSchemaResolver& schema_resolver,
-      ParquetColumnReader** reader)
-      WARN_UNUSED_RESULT;
-
-  /// Walks file_metadata_ and initiates reading the materialized columns.  This
-  /// initializes 'scalar_readers_' and divides reservation between the columns but
-  /// does not start any scan ranges.
-  Status InitScalarColumns() WARN_UNUSED_RESULT;
-
-  /// Decides how to divide stream_->reservation() between the columns. May increase
-  /// the reservation if more reservation would enable more efficient I/O for the
-  /// current columns being scanned. Sets the reservation on each corresponding reader
-  /// in 'column_readers'.
-  Status DivideReservationBetweenColumns(
-      const std::vector<BaseScalarColumnReader*>& column_readers);
-
-  /// Compute the ideal reservation to scan a file with scan range lengths
-  /// 'col_range_lengths' given the min and max buffer size of the singleton DiskIoMgr
-  /// in ExecEnv.
-  static int64_t ComputeIdealReservation(const std::vector<int64_t>& col_range_lengths);
-
-  /// Helper for DivideReservationBetweenColumns(). Implements the core algorithm for
-  /// dividing a reservation of 'reservation_to_distribute' bytes between columns with
-  /// scan range lengths 'col_range_lengths' given a min and max buffer size. Returns
-  /// a vector with an entry per column with the index into 'col_range_lengths' and the
-  /// amount of reservation in bytes to give to that column.
-  static std::vector<std::pair<int, int64_t>> DivideReservationBetweenColumnsHelper(
-      int64_t min_buffer_size, int64_t max_buffer_size,
-      const std::vector<int64_t>& col_range_lengths, int64_t reservation_to_distribute);
-
-  /// Initializes the column readers in collection_readers_.
-  void InitCollectionColumns();
-
-  /// Initialize dictionaries for all column readers
-  Status InitDictionaries(const std::vector<BaseScalarColumnReader*>& column_readers)
-      WARN_UNUSED_RESULT;
-
-  /// Performs some validation once we've reached the end of a row group to help detect
-  /// bugs or bad input files.
-  Status ValidateEndOfRowGroup(const std::vector<ParquetColumnReader*>& column_readers,
-      int row_group_idx, int64_t rows_read) WARN_UNUSED_RESULT;
-
-  /// Part of the HdfsScanner interface, not used in Parquet.
-  Status InitNewRange() WARN_UNUSED_RESULT { return Status::OK(); }
-
-  /// Transfers the remaining resources backing tuples such as IO buffers and memory
-  /// from mem pools to the given row batch. Closes all column readers.
-  /// Should be called after completing a row group and when returning the last batch.
-  void FlushRowGroupResources(RowBatch* row_batch);
-
-  /// Releases resources associated with a row group that was skipped and closes all
-  /// column readers. Should be called after skipping a row group from which no rows
-  /// were returned.
-  void ReleaseSkippedRowGroupResources();
-
-  /// Evaluates whether the column reader is eligible for dictionary predicates
-  bool IsDictFilterable(ParquetColumnReader* col_reader);
-
-  /// Evaluates whether the column reader is eligible for dictionary predicates.
-  bool IsDictFilterable(BaseScalarColumnReader* col_reader);
-
-  /// Partitions the readers into scalar and collection readers. The collection readers
-  /// are flattened into collection_readers_. The scalar readers are partitioned into
-  /// dict_filterable_readers_ and non_dict_filterable_readers_ depending on whether
-  /// dictionary filtering is enabled and the reader can be dictionary filtered. All
-  /// scalar readers are also flattened into scalar_readers_.
-  void PartitionReaders(const vector<ParquetColumnReader*>& readers,
-                        bool can_eval_dict_filters);
-
-  /// Divides the column readers into dict_filterable_readers_,
-  /// non_dict_filterable_readers_ and collection_readers_. Allocates memory for
-  /// dict_filter_tuple_map_.
-  Status InitDictFilterStructures() WARN_UNUSED_RESULT;
-
-  /// Returns true if all of the data pages in the column chunk are dictionary encoded
-  bool IsDictionaryEncoded(const parquet::ColumnMetaData& col_metadata);
-
-  /// Checks to see if this row group can be eliminated based on applying conjuncts
-  /// to the dictionary values. Specifically, if any dictionary-encoded column has
-  /// no values that pass the relevant conjuncts, then the row group can be skipped.
-  Status EvalDictionaryFilters(const parquet::RowGroup& row_group,
-      bool* skip_row_group) WARN_UNUSED_RESULT;
-};
-
-} // namespace impala
-
-#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
deleted file mode 100644
index dd60efd..0000000
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ /dev/null
@@ -1,1321 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/hdfs-parquet-table-writer.h"
-
-#include <boost/unordered_set.hpp>
-
-#include "common/version.h"
-#include "exec/hdfs-table-sink.h"
-#include "exec/parquet-column-stats.inline.h"
-#include "exprs/scalar-expr.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "rpc/thrift-util.h"
-#include "runtime/decimal-value.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/raw-value.h"
-#include "runtime/row-batch.h"
-#include "runtime/runtime-state.h"
-#include "runtime/string-value.inline.h"
-#include "util/bit-stream-utils.h"
-#include "util/bit-util.h"
-#include "util/buffer-builder.h"
-#include "util/compress.h"
-#include "util/debug-util.h"
-#include "util/dict-encoding.h"
-#include "util/hdfs-util.h"
-#include "util/string-util.h"
-#include "util/rle-encoding.h"
-
-#include <sstream>
-
-#include "gen-cpp/ImpalaService_types.h"
-
-#include "common/names.h"
-using namespace impala;
-using namespace apache::thrift;
-
-// Managing file sizes: We need to estimate how big the files being buffered
-// are in order to split them correctly in HDFS. Having a file that is too big
-// will cause remote reads (parquet files are non-splittable).
-// It's too expensive to compute the exact file sizes as the rows are buffered
-// since the values in the current pages are only encoded/compressed when the page
-// is full. Once the page is full, we encode and compress it, at which point we know
-// the exact on file size.
-// The current buffered pages (one for each column) can have a very poor estimate.
-// To adjust for this, we aim for a slightly smaller file size than the ideal.
-//
-// Class that encapsulates all the state for writing a single column.  This contains
-// all the buffered pages as well as the metadata (e.g. byte sizes, num values, etc).
-// This is intended to be created once per writer per column and reused across
-// row groups.
-// We currently accumulate all the data pages for an entire row group per column
-// before flushing them.  This can be pretty large (hundreds of MB) but we can't
-// fix this without collocated files in HDFS.  With collocated files, the minimum
-// we'd need to buffer is 1 page per column so on the order of 1MB (although we might
-// decide to buffer a few pages for better HDFS write performance).
-// Pages are reused between flushes.  They are created on demand as necessary and
-// recycled after a flush.
-// As rows come in, we accumulate the encoded values into the values_ and def_levels_
-// buffers. When we've accumulated a page worth's of data, we combine values_ and
-// def_levels_ into a single buffer that would be the exact bytes (with no gaps) in
-// the file. The combined buffer is compressed if compression is enabled and we
-// keep the combined/compressed buffer until we need to flush the file. The
-// values_ and def_levels_ are then reused for the next page.
-//
-// TODO: For codegen, we would codegen the AppendRow() function for each column.
-// This codegen is specific to the column expr (and type) and encoding.  The
-// parent writer object would combine all the generated AppendRow from all
-// the columns and run that function over row batches.
-// TODO: we need to pass in the compression from the FE/metadata
-
-DECLARE_bool(enable_parquet_page_index_writing_debug_only);
-
-namespace impala {
-
-// Base class for column writers. This contains most of the logic except for
-// the type specific functions which are implemented in the subclasses.
-class HdfsParquetTableWriter::BaseColumnWriter {
- public:
-  // expr - the expression to generate output values for this column.
-  BaseColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* expr_eval,
-      const THdfsCompression::type& codec)
-    : parent_(parent),
-      expr_eval_(expr_eval),
-      codec_(codec),
-      page_size_(DEFAULT_DATA_PAGE_SIZE),
-      current_page_(nullptr),
-      num_values_(0),
-      total_compressed_byte_size_(0),
-      total_uncompressed_byte_size_(0),
-      dict_encoder_base_(nullptr),
-      def_levels_(nullptr),
-      values_buffer_len_(DEFAULT_DATA_PAGE_SIZE),
-      page_stats_base_(nullptr),
-      row_group_stats_base_(nullptr),
-      table_sink_mem_tracker_(parent_->parent_->mem_tracker()) {
-    static_assert(std::is_same<decltype(parent_->parent_), HdfsTableSink*>::value,
-        "'table_sink_mem_tracker_' must point to the mem tracker of an HdfsTableSink");
-    def_levels_ = parent_->state_->obj_pool()->Add(
-        new RleEncoder(parent_->reusable_col_mem_pool_->Allocate(DEFAULT_DATA_PAGE_SIZE),
-                       DEFAULT_DATA_PAGE_SIZE, 1));
-    values_buffer_ = parent_->reusable_col_mem_pool_->Allocate(values_buffer_len_);
-  }
-
-  virtual ~BaseColumnWriter() {}
-
-  // Called after the constructor to initialize the column writer.
-  Status Init() WARN_UNUSED_RESULT {
-    Reset();
-    RETURN_IF_ERROR(Codec::CreateCompressor(nullptr, false, codec_, &compressor_));
-    return Status::OK();
-  }
-
-  // Appends the row to this column.  This buffers the value into a data page.  Returns
-  // error if the space needed for the encoded value is larger than the data page size.
-  // TODO: this needs to be batch based, instead of row based for better performance.
-  // This is a bit trickier to handle the case where only a partial row batch can be
-  // output to the current file because it reaches the max file size.  Enabling codegen
-  // would also solve this problem.
-  Status AppendRow(TupleRow* row) WARN_UNUSED_RESULT;
-
-  // Flushes all buffered data pages to the file.
-  // *file_pos is an output parameter and will be incremented by
-  // the number of bytes needed to write all the data pages for this column.
-  // first_data_page and first_dictionary_page are also out parameters and
-  // will contain the byte offset for the data page and dictionary page.  They
-  // will be set to -1 if the column does not contain that type of page.
-  Status Flush(int64_t* file_pos, int64_t* first_data_page,
-      int64_t* first_dictionary_page) WARN_UNUSED_RESULT;
-
-  // Materializes the column statistics to the per-file MemPool so they are available
-  // after their row batch buffer has been freed.
-  Status MaterializeStatsValues() WARN_UNUSED_RESULT {
-    RETURN_IF_ERROR(row_group_stats_base_->MaterializeStringValuesToInternalBuffers());
-    RETURN_IF_ERROR(page_stats_base_->MaterializeStringValuesToInternalBuffers());
-    return Status::OK();
-  }
-
-  // Encodes the row group statistics into a parquet::Statistics object and attaches it to
-  // 'meta_data'.
-  void EncodeRowGroupStats(parquet::ColumnMetaData* meta_data) {
-    DCHECK(row_group_stats_base_ != nullptr);
-    if (row_group_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) {
-      row_group_stats_base_->EncodeToThrift(&meta_data->statistics);
-      meta_data->__isset.statistics = true;
-    }
-  }
-
-  // Resets all the data accumulated for this column.  Memory can now be reused for
-  // the next row group.
-  // Any data for previous row groups must be reset (e.g. dictionaries).
-  // Subclasses must call this if they override this function.
-  virtual void Reset() {
-    num_values_ = 0;
-    total_compressed_byte_size_ = 0;
-    current_encoding_ = parquet::Encoding::PLAIN;
-    next_page_encoding_ = parquet::Encoding::PLAIN;
-    pages_.clear();
-    current_page_ = nullptr;
-    column_encodings_.clear();
-    dict_encoding_stats_.clear();
-    data_encoding_stats_.clear();
-    // Repetition/definition level encodings are constant. Incorporate them here.
-    column_encodings_.insert(parquet::Encoding::RLE);
-    offset_index_.page_locations.clear();
-    column_index_.null_pages.clear();
-    column_index_.min_values.clear();
-    column_index_.max_values.clear();
-    table_sink_mem_tracker_->Release(page_index_memory_consumption_);
-    page_index_memory_consumption_ = 0;
-    column_index_.null_counts.clear();
-    valid_column_index_ = true;
-  }
-
-  // Close this writer. This is only called after Flush() and no more rows will
-  // be added.
-  void Close() {
-    if (compressor_.get() != nullptr) compressor_->Close();
-    if (dict_encoder_base_ != nullptr) dict_encoder_base_->Close();
-    // We must release the memory consumption of this column writer.
-    table_sink_mem_tracker_->Release(page_index_memory_consumption_);
-    page_index_memory_consumption_ = 0;
-  }
-
-  const ColumnType& type() const { return expr_eval_->root().type(); }
-  uint64_t num_values() const { return num_values_; }
-  uint64_t total_compressed_size() const { return total_compressed_byte_size_; }
-  uint64_t total_uncompressed_size() const { return total_uncompressed_byte_size_; }
-  parquet::CompressionCodec::type GetParquetCodec() const {
-    return ConvertImpalaToParquetCodec(codec_);
-  }
-
- protected:
-  friend class HdfsParquetTableWriter;
-
-  Status AddMemoryConsumptionForPageIndex(int64_t new_memory_allocation) {
-    if (UNLIKELY(!table_sink_mem_tracker_->TryConsume(new_memory_allocation))) {
-      return table_sink_mem_tracker_->MemLimitExceeded(parent_->state_,
-          "Failed to allocate memory for Parquet page index.", new_memory_allocation);
-    }
-    page_index_memory_consumption_ += new_memory_allocation;
-    return Status::OK();
-  }
-
-  Status ReserveOffsetIndex(int64_t capacity) {
-    if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK();
-    RETURN_IF_ERROR(
-        AddMemoryConsumptionForPageIndex(capacity * sizeof(parquet::PageLocation)));
-    offset_index_.page_locations.reserve(capacity);
-    return Status::OK();
-  }
-
-  void AddLocationToOffsetIndex(const parquet::PageLocation& location) {
-    if (!FLAGS_enable_parquet_page_index_writing_debug_only) return;
-    offset_index_.page_locations.push_back(location);
-  }
-
-  Status AddPageStatsToColumnIndex() {
-    if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK();
-    parquet::Statistics page_stats;
-    page_stats_base_->EncodeToThrift(&page_stats);
-    // If pages_stats contains min_value and max_value, then append them to min_values_
-    // and max_values_ and also mark the page as not null. In case min and max values are
-    // not set, push empty strings to maintain the consistency of the index and mark the
-    // page as null. Always push the null_count.
-    string min_val;
-    string max_val;
-    if ((page_stats.__isset.min_value) && (page_stats.__isset.max_value)) {
-      Status s_min = TruncateDown(page_stats.min_value, PAGE_INDEX_MAX_STRING_LENGTH,
-          &min_val);
-      Status s_max = TruncateUp(page_stats.max_value, PAGE_INDEX_MAX_STRING_LENGTH,
-          &max_val);
-      if (!s_min.ok() || !s_max.ok()) valid_column_index_ = false;
-      column_index_.null_pages.push_back(false);
-    } else {
-      DCHECK(!page_stats.__isset.min_value && !page_stats.__isset.max_value);
-      column_index_.null_pages.push_back(true);
-      DCHECK_EQ(page_stats.null_count, num_values_);
-    }
-    RETURN_IF_ERROR(
-        AddMemoryConsumptionForPageIndex(min_val.capacity() + max_val.capacity()));
-    column_index_.min_values.emplace_back(std::move(min_val));
-    column_index_.max_values.emplace_back(std::move(max_val));
-    column_index_.null_counts.push_back(page_stats.null_count);
-    return Status::OK();
-  }
-
-  // Encodes value into the current page output buffer and updates the column statistics
-  // aggregates. Returns true if the value was appended successfully to the current page.
-  // Returns false if the value was not appended to the current page and the caller can
-  // create a new page and try again with the same value. May change
-  // 'next_page_encoding_' if the encoding for the next page should be different - e.g.
-  // if a dictionary overflowed and dictionary encoding is no longer viable.
-  // *bytes_needed will contain the (estimated) number of bytes needed to successfully
-  // encode the value in the page.
-  // Implemented in the subclass.
-  virtual bool ProcessValue(void* value, int64_t* bytes_needed) WARN_UNUSED_RESULT = 0;
-
-  // Encodes out all data for the current page and updates the metadata.
-  virtual Status FinalizeCurrentPage() WARN_UNUSED_RESULT;
-
-  // Update current_page_ to a new page, reusing pages allocated if possible.
-  void NewPage();
-
-  // Writes out the dictionary encoded data buffered in dict_encoder_.
-  void WriteDictDataPage();
-
-  struct DataPage {
-    // Page header.  This is a union of all page types.
-    parquet::PageHeader header;
-
-    // Number of bytes needed to store definition levels.
-    int num_def_bytes;
-
-    // This is the payload for the data page.  This includes the definition/repetition
-    // levels data and the encoded values.  If compression is enabled, this is the
-    // compressed data.
-    uint8_t* data;
-
-    // If true, this data page has been finalized.  All sizes are computed, header is
-    // fully populated and any compression is done.
-    bool finalized;
-
-    // Number of non-null values
-    int num_non_null;
-  };
-
-  HdfsParquetTableWriter* parent_;
-  ScalarExprEvaluator* expr_eval_;
-
-  THdfsCompression::type codec_;
-
-  // Compression codec for this column.  If nullptr, this column is will not be
-  // compressed.
-  scoped_ptr<Codec> compressor_;
-
-  // Size of newly created pages. Defaults to DEFAULT_DATA_PAGE_SIZE and is increased
-  // when pages are not big enough. This only happens when there are enough unique values
-  // such that we switch from PLAIN_DICTIONARY to PLAIN encoding and then have very
-  // large values (i.e. greater than DEFAULT_DATA_PAGE_SIZE).
-  // TODO: Consider removing and only creating a single large page as necessary.
-  int64_t page_size_;
-
-  // Pages belong to this column chunk. We need to keep them in memory in order to write
-  // them together.
-  vector<DataPage> pages_;
-
-  // Pointer to the current page in 'pages_'. Not owned.
-  DataPage* current_page_;
-
-  // Total number of values across all pages, including NULL.
-  int64_t num_values_;
-  int64_t total_compressed_byte_size_;
-  int64_t total_uncompressed_byte_size_;
-  // Encoding of the current page.
-  parquet::Encoding::type current_encoding_;
-  // Encoding to use for the next page. By default, the same as 'current_encoding_'.
-  // Used by the column writer to switch encoding while writing a column, e.g. if the
-  // dictionary overflows.
-  parquet::Encoding::type next_page_encoding_;
-
-  // Set of all encodings used in the column chunk
-  unordered_set<parquet::Encoding::type> column_encodings_;
-
-  // Map from the encoding to the number of pages in the column chunk with this encoding
-  // These are used to construct the PageEncodingStats, which provide information
-  // about encoding usage for each different page type. Currently, only dictionary
-  // and data pages are used.
-  unordered_map<parquet::Encoding::type, int> dict_encoding_stats_;
-  unordered_map<parquet::Encoding::type, int> data_encoding_stats_;
-
-  // Created, owned, and set by the derived class.
-  DictEncoderBase* dict_encoder_base_;
-
-  // Rle encoder object for storing definition levels, owned by instances of this class.
-  // For non-nested schemas, this always uses 1 bit per row. This is reused across pages
-  // since the underlying buffer is copied out when the page is finalized.
-  RleEncoder* def_levels_;
-
-  // Data for buffered values. This is owned by instances of this class and gets reused
-  // across pages.
-  uint8_t* values_buffer_;
-  // The size of values_buffer_.
-  int values_buffer_len_;
-
-  // Pointers to statistics, created, owned, and set by the derived class.
-  ColumnStatsBase* page_stats_base_;
-  ColumnStatsBase* row_group_stats_base_;
-
-  // OffsetIndex stores the locations of the pages.
-  parquet::OffsetIndex offset_index_;
-
-  // ColumnIndex stores the statistics of the pages.
-  parquet::ColumnIndex column_index_;
-
-  // Pointer to the HdfsTableSink's MemTracker.
-  MemTracker* table_sink_mem_tracker_;
-
-  // Memory consumption of the min/max values in the page index.
-  int64_t page_index_memory_consumption_ = 0;
-
-  // Only write ColumnIndex when 'valid_column_index_' is true. We always need to write
-  // the OffsetIndex though.
-  bool valid_column_index_ = true;
-};
-
-// Per type column writer.
-template<typename T>
-class HdfsParquetTableWriter::ColumnWriter :
-    public HdfsParquetTableWriter::BaseColumnWriter {
- public:
-  ColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* eval,
-      const THdfsCompression::type& codec)
-    : BaseColumnWriter(parent, eval, codec),
-      num_values_since_dict_size_check_(0),
-      plain_encoded_value_size_(
-          ParquetPlainEncoder::EncodedByteSize(eval->root().type())) {
-    DCHECK_NE(eval->root().type().type, TYPE_BOOLEAN);
-    // IMPALA-7304: Don't write column index for floating-point columns until
-    // PARQUET-1222 is resolved.
-    if (std::is_floating_point<T>::value) valid_column_index_ = false;
-  }
-
-  virtual void Reset() {
-    BaseColumnWriter::Reset();
-    // IMPALA-7304: Don't write column index for floating-point columns until
-    // PARQUET-1222 is resolved.
-    if (std::is_floating_point<T>::value) valid_column_index_ = false;
-    // Default to dictionary encoding.  If the cardinality ends up being too high,
-    // it will fall back to plain.
-    current_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
-    next_page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
-    dict_encoder_.reset(
-        new DictEncoder<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_,
-            parent_->parent_->mem_tracker()));
-    dict_encoder_base_ = dict_encoder_.get();
-    page_stats_.reset(
-        new ColumnStats<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_));
-    page_stats_base_ = page_stats_.get();
-    row_group_stats_.reset(
-        new ColumnStats<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_));
-    row_group_stats_base_ = row_group_stats_.get();
-  }
-
- protected:
-  virtual bool ProcessValue(void* value, int64_t* bytes_needed) {
-    if (current_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
-      if (UNLIKELY(num_values_since_dict_size_check_ >=
-                   DICTIONARY_DATA_PAGE_SIZE_CHECK_PERIOD)) {
-        num_values_since_dict_size_check_ = 0;
-        if (dict_encoder_->EstimatedDataEncodedSize() >= page_size_) return false;
-      }
-      ++num_values_since_dict_size_check_;
-      *bytes_needed = dict_encoder_->Put(*CastValue(value));
-      // If the dictionary contains the maximum number of values, switch to plain
-      // encoding for the next page. The current page is full and must be written out.
-      if (UNLIKELY(*bytes_needed < 0)) {
-        next_page_encoding_ = parquet::Encoding::PLAIN;
-        return false;
-      }
-      parent_->file_size_estimate_ += *bytes_needed;
-    } else if (current_encoding_ == parquet::Encoding::PLAIN) {
-      T* v = CastValue(value);
-      *bytes_needed = plain_encoded_value_size_ < 0 ?
-          ParquetPlainEncoder::ByteSize<T>(*v) :
-          plain_encoded_value_size_;
-      if (current_page_->header.uncompressed_page_size + *bytes_needed > page_size_) {
-        return false;
-      }
-      uint8_t* dst_ptr = values_buffer_ + current_page_->header.uncompressed_page_size;
-      int64_t written_len =
-          ParquetPlainEncoder::Encode(*v, plain_encoded_value_size_, dst_ptr);
-      DCHECK_EQ(*bytes_needed, written_len);
-      current_page_->header.uncompressed_page_size += written_len;
-    } else {
-      // TODO: support other encodings here
-      DCHECK(false);
-    }
-    page_stats_->Update(*CastValue(value));
-    return true;
-  }
-
- private:
-  // The period, in # of rows, to check the estimated dictionary page size against
-  // the data page size. We want to start a new data page when the estimated size
-  // is at least that big. The estimated size computation is not very cheap and
-  // we can tolerate going over the data page size by some amount.
-  // The expected byte size per dictionary value is < 1B and at most 2 bytes so the
-  // error is pretty low.
-  // TODO: is there a better way?
-  static const int DICTIONARY_DATA_PAGE_SIZE_CHECK_PERIOD = 100;
-
-  // Encoder for dictionary encoding for different columns. Only one is set.
-  scoped_ptr<DictEncoder<T>> dict_encoder_;
-
-  // The number of values added since we last checked the dictionary.
-  int num_values_since_dict_size_check_;
-
-  // Size of each encoded value in plain encoding. -1 if the type is variable-length.
-  int64_t plain_encoded_value_size_;
-
-  // Temporary string value to hold CHAR(N)
-  StringValue temp_;
-
-  // Tracks statistics per page. These are written out to the page index.
-  scoped_ptr<ColumnStats<T>> page_stats_;
-
-  // Tracks statistics per row group. This gets reset when starting a new row group.
-  scoped_ptr<ColumnStats<T>> row_group_stats_;
-
-  // Converts a slot pointer to a raw value suitable for encoding
-  inline T* CastValue(void* value) {
-    return reinterpret_cast<T*>(value);
-  }
-};
-
-template<>
-inline StringValue* HdfsParquetTableWriter::ColumnWriter<StringValue>::CastValue(
-    void* value) {
-  if (type().type == TYPE_CHAR) {
-    temp_.ptr = reinterpret_cast<char*>(value);
-    temp_.len = StringValue::UnpaddedCharLength(temp_.ptr, type().len);
-    return &temp_;
-  }
-  return reinterpret_cast<StringValue*>(value);
-}
-
-// Bools are encoded a bit differently so subclass it explicitly.
-class HdfsParquetTableWriter::BoolColumnWriter :
-    public HdfsParquetTableWriter::BaseColumnWriter {
- public:
-  BoolColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* eval,
-      const THdfsCompression::type& codec)
-    : BaseColumnWriter(parent, eval, codec),
-      page_stats_(parent_->reusable_col_mem_pool_.get(), -1),
-      row_group_stats_(parent_->reusable_col_mem_pool_.get(), -1) {
-    DCHECK_EQ(eval->root().type().type, TYPE_BOOLEAN);
-    bool_values_ = parent_->state_->obj_pool()->Add(
-        new BitWriter(values_buffer_, values_buffer_len_));
-    // Dictionary encoding doesn't make sense for bools and is not allowed by
-    // the format.
-    current_encoding_ = parquet::Encoding::PLAIN;
-    dict_encoder_base_ = nullptr;
-
-    page_stats_base_ = &page_stats_;
-    row_group_stats_base_ = &row_group_stats_;
-  }
-
- protected:
-  virtual bool ProcessValue(void* value, int64_t* bytes_needed) {
-    bool v = *reinterpret_cast<bool*>(value);
-    if (!bool_values_->PutValue(v, 1)) return false;
-    page_stats_.Update(v);
-    return true;
-  }
-
-  virtual Status FinalizeCurrentPage() {
-    DCHECK(current_page_ != nullptr);
-    if (current_page_->finalized) return Status::OK();
-    bool_values_->Flush();
-    int num_bytes = bool_values_->bytes_written();
-    current_page_->header.uncompressed_page_size += num_bytes;
-    // Call into superclass to handle the rest.
-    RETURN_IF_ERROR(BaseColumnWriter::FinalizeCurrentPage());
-    bool_values_->Clear();
-    return Status::OK();
-  }
-
- private:
-  // Used to encode bools as single bit values. This is reused across pages.
-  BitWriter* bool_values_;
-
-  // Tracks statistics per page. These are written out to the page index.
-  ColumnStats<bool> page_stats_;
-
-  // Tracks statistics per row group. This gets reset when starting a new file.
-  ColumnStats<bool> row_group_stats_;
-};
-
-}
-
-inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row) {
-  ++num_values_;
-  void* value = expr_eval_->GetValue(row);
-  if (current_page_ == nullptr) NewPage();
-
-  // Ensure that we have enough space for the definition level, but don't write it yet in
-  // case we don't have enough space for the value.
-  if (def_levels_->buffer_full()) {
-    RETURN_IF_ERROR(FinalizeCurrentPage());
-    NewPage();
-  }
-
-  // Encoding may fail for several reasons - because the current page is not big enough,
-  // because we've encoded the maximum number of unique dictionary values and need to
-  // switch to plain encoding, etc. so we may need to try again more than once.
-  // TODO: Have a clearer set of state transitions here, to make it easier to see that
-  // this won't loop forever.
-  while (true) {
-    // Nulls don't get encoded. Increment the null count of the parquet statistics.
-    if (value == nullptr) {
-      DCHECK(page_stats_base_ != nullptr);
-      page_stats_base_->IncrementNullCount(1);
-      break;
-    }
-
-    int64_t bytes_needed = 0;
-    if (ProcessValue(value, &bytes_needed)) {
-      ++current_page_->num_non_null;
-      break; // Succesfully appended, don't need to retry.
-    }
-
-    // Value didn't fit on page, try again on a new page.
-    RETURN_IF_ERROR(FinalizeCurrentPage());
-
-    // Check how much space is needed to write this value. If that is larger than the
-    // page size then increase page size and try again.
-    if (UNLIKELY(bytes_needed > page_size_)) {
-      if (bytes_needed > MAX_DATA_PAGE_SIZE) {
-        stringstream ss;
-        ss << "Cannot write value of size "
-           << PrettyPrinter::Print(bytes_needed, TUnit::BYTES) << " bytes to a Parquet "
-           << "data page that exceeds the max page limit "
-           << PrettyPrinter::Print(MAX_DATA_PAGE_SIZE , TUnit::BYTES) << ".";
-        return Status(ss.str());
-      }
-      page_size_ = bytes_needed;
-      values_buffer_len_ = page_size_;
-      values_buffer_ = parent_->reusable_col_mem_pool_->Allocate(values_buffer_len_);
-    }
-    NewPage();
-  }
-
-  // Now that the value has been successfully written, write the definition level.
-  bool ret = def_levels_->Put(value != nullptr);
-  // Writing the def level will succeed because we ensured there was enough space for it
-  // above, and new pages will always have space for at least a single def level.
-  DCHECK(ret);
-
-  ++current_page_->header.data_page_header.num_values;
-  return Status::OK();
-}
-
-inline void HdfsParquetTableWriter::BaseColumnWriter::WriteDictDataPage() {
-  DCHECK(dict_encoder_base_ != nullptr);
-  DCHECK_EQ(current_page_->header.uncompressed_page_size, 0);
-  if (current_page_->num_non_null == 0) return;
-  int len = dict_encoder_base_->WriteData(values_buffer_, values_buffer_len_);
-  while (UNLIKELY(len < 0)) {
-    // len < 0 indicates the data doesn't fit into a data page. Allocate a larger data
-    // page.
-    values_buffer_len_ *= 2;
-    values_buffer_ = parent_->reusable_col_mem_pool_->Allocate(values_buffer_len_);
-    len = dict_encoder_base_->WriteData(values_buffer_, values_buffer_len_);
-  }
-  dict_encoder_base_->ClearIndices();
-  current_page_->header.uncompressed_page_size = len;
-}
-
-Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
-   int64_t* first_data_page, int64_t* first_dictionary_page) {
-  if (current_page_ == nullptr) {
-    // This column/file is empty
-    *first_data_page = *file_pos;
-    *first_dictionary_page = -1;
-    return Status::OK();
-  }
-
-  RETURN_IF_ERROR(FinalizeCurrentPage());
-
-  *first_dictionary_page = -1;
-  // First write the dictionary page before any of the data pages.
-  if (dict_encoder_base_ != nullptr) {
-    *first_dictionary_page = *file_pos;
-    // Write dictionary page header
-    parquet::DictionaryPageHeader dict_header;
-    dict_header.num_values = dict_encoder_base_->num_entries();
-    dict_header.encoding = parquet::Encoding::PLAIN_DICTIONARY;
-    ++dict_encoding_stats_[dict_header.encoding];
-
-    parquet::PageHeader header;
-    header.type = parquet::PageType::DICTIONARY_PAGE;
-    header.uncompressed_page_size = dict_encoder_base_->dict_encoded_size();
-    header.__set_dictionary_page_header(dict_header);
-
-    // Write the dictionary page data, compressing it if necessary.
-    uint8_t* dict_buffer = parent_->per_file_mem_pool_->Allocate(
-        header.uncompressed_page_size);
-    dict_encoder_base_->WriteDict(dict_buffer);
-    if (compressor_.get() != nullptr) {
-      SCOPED_TIMER(parent_->parent_->compress_timer());
-      int64_t max_compressed_size =
-          compressor_->MaxOutputLen(header.uncompressed_page_size);
-      DCHECK_GT(max_compressed_size, 0);
-      uint8_t* compressed_data =
-          parent_->per_file_mem_pool_->Allocate(max_compressed_size);
-      header.compressed_page_size = max_compressed_size;
-      RETURN_IF_ERROR(compressor_->ProcessBlock32(true, header.uncompressed_page_size,
-          dict_buffer, &header.compressed_page_size, &compressed_data));
-      dict_buffer = compressed_data;
-      // We allocated the output based on the guessed size, return the extra allocated
-      // bytes back to the mem pool.
-      parent_->per_file_mem_pool_->ReturnPartialAllocation(
-          max_compressed_size - header.compressed_page_size);
-    } else {
-      header.compressed_page_size = header.uncompressed_page_size;
-    }
-
-    uint8_t* header_buffer;
-    uint32_t header_len;
-    RETURN_IF_ERROR(parent_->thrift_serializer_->SerializeToBuffer(
-        &header, &header_len, &header_buffer));
-    RETURN_IF_ERROR(parent_->Write(header_buffer, header_len));
-    *file_pos += header_len;
-    total_compressed_byte_size_ += header_len;
-    total_uncompressed_byte_size_ += header_len;
-
-    RETURN_IF_ERROR(parent_->Write(dict_buffer, header.compressed_page_size));
-    *file_pos += header.compressed_page_size;
-    total_compressed_byte_size_ += header.compressed_page_size;
-    total_uncompressed_byte_size_ += header.uncompressed_page_size;
-  }
-
-  *first_data_page = *file_pos;
-  int64_t current_row_group_index = 0;
-  RETURN_IF_ERROR(ReserveOffsetIndex(pages_.size()));
-
-  // Write data pages
-  for (const DataPage& page : pages_) {
-    parquet::PageLocation location;
-
-    if (page.header.data_page_header.num_values == 0) {
-      // Skip empty pages
-      location.offset = -1;
-      location.compressed_page_size = 0;
-      location.first_row_index = -1;
-      AddLocationToOffsetIndex(location);
-      continue;
-    }
-
-    location.offset = *file_pos;
-    location.first_row_index = current_row_group_index;
-
-    // Write data page header
-    uint8_t* buffer = nullptr;
-    uint32_t len = 0;
-    RETURN_IF_ERROR(
-        parent_->thrift_serializer_->SerializeToBuffer(&page.header, &len, &buffer));
-    RETURN_IF_ERROR(parent_->Write(buffer, len));
-    *file_pos += len;
-
-    // Note that the namings are confusing here:
-    // parquet::PageHeader::compressed_page_size is the compressed page size in bytes, as
-    // its name suggests. On the other hand, parquet::PageLocation::compressed_page_size
-    // also includes the size of the page header.
-    location.compressed_page_size = page.header.compressed_page_size + len;
-    AddLocationToOffsetIndex(location);
-
-    // Write the page data
-    RETURN_IF_ERROR(parent_->Write(page.data, page.header.compressed_page_size));
-    *file_pos += page.header.compressed_page_size;
-    current_row_group_index += page.header.data_page_header.num_values;
-  }
-  return Status::OK();
-}
-
-Status HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
-  DCHECK(current_page_ != nullptr);
-  if (current_page_->finalized) return Status::OK();
-
-  // If the entire page was NULL, encode it as PLAIN since there is no
-  // data anyway. We don't output a useless dictionary page and it works
-  // around a parquet MR bug (see IMPALA-759 for more details).
-  if (current_page_->num_non_null == 0) current_encoding_ = parquet::Encoding::PLAIN;
-
-  if (current_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) WriteDictDataPage();
-
-  parquet::PageHeader& header = current_page_->header;
-  header.data_page_header.encoding = current_encoding_;
-
-  // Accumulate encoding statistics
-  column_encodings_.insert(header.data_page_header.encoding);
-  ++data_encoding_stats_[header.data_page_header.encoding];
-
-  // Compute size of definition bits
-  def_levels_->Flush();
-  current_page_->num_def_bytes = sizeof(int32_t) + def_levels_->len();
-  header.uncompressed_page_size += current_page_->num_def_bytes;
-
-  // At this point we know all the data for the data page.  Combine them into one buffer.
-  uint8_t* uncompressed_data = nullptr;
-  if (compressor_.get() == nullptr) {
-    uncompressed_data =
-        parent_->per_file_mem_pool_->Allocate(header.uncompressed_page_size);
-  } else {
-    // We have compression.  Combine into the staging buffer.
-    parent_->compression_staging_buffer_.resize(
-        header.uncompressed_page_size);
-    uncompressed_data = &parent_->compression_staging_buffer_[0];
-  }
-
-  BufferBuilder buffer(uncompressed_data, header.uncompressed_page_size);
-
-  // Copy the definition (null) data
-  int num_def_level_bytes = def_levels_->len();
-
-  buffer.Append(num_def_level_bytes);
-  buffer.Append(def_levels_->buffer(), num_def_level_bytes);
-  // TODO: copy repetition data when we support nested types.
-  buffer.Append(values_buffer_, buffer.capacity() - buffer.size());
-
-  // Apply compression if necessary
-  if (compressor_.get() == nullptr) {
-    current_page_->data = uncompressed_data;
-    header.compressed_page_size = header.uncompressed_page_size;
-  } else {
-    SCOPED_TIMER(parent_->parent_->compress_timer());
-    int64_t max_compressed_size =
-        compressor_->MaxOutputLen(header.uncompressed_page_size);
-    DCHECK_GT(max_compressed_size, 0);
-    uint8_t* compressed_data = parent_->per_file_mem_pool_->Allocate(max_compressed_size);
-    header.compressed_page_size = max_compressed_size;
-    RETURN_IF_ERROR(compressor_->ProcessBlock32(true, header.uncompressed_page_size,
-        uncompressed_data, &header.compressed_page_size, &compressed_data));
-    current_page_->data = compressed_data;
-
-    // We allocated the output based on the guessed size, return the extra allocated
-    // bytes back to the mem pool.
-    parent_->per_file_mem_pool_->ReturnPartialAllocation(
-        max_compressed_size - header.compressed_page_size);
-  }
-
-  DCHECK(page_stats_base_ != nullptr);
-  RETURN_IF_ERROR(AddPageStatsToColumnIndex());
-
-  // Update row group statistics from page statistics.
-  DCHECK(row_group_stats_base_ != nullptr);
-  row_group_stats_base_->Merge(*page_stats_base_);
-
-  // Add the size of the data page header
-  uint8_t* header_buffer;
-  uint32_t header_len = 0;
-  RETURN_IF_ERROR(parent_->thrift_serializer_->SerializeToBuffer(
-      &current_page_->header, &header_len, &header_buffer));
-
-  current_page_->finalized = true;
-  total_compressed_byte_size_ += header_len + header.compressed_page_size;
-  total_uncompressed_byte_size_ += header_len + header.uncompressed_page_size;
-  parent_->file_size_estimate_ += header_len + header.compressed_page_size;
-  def_levels_->Clear();
-  return Status::OK();
-}
-
-void HdfsParquetTableWriter::BaseColumnWriter::NewPage() {
-  pages_.push_back(DataPage());
-  current_page_ = &pages_.back();
-
-  parquet::DataPageHeader header;
-  header.num_values = 0;
-  // The code that populates the column chunk metadata's encodings field
-  // relies on these specific values for the definition/repetition level
-  // encodings.
-  header.definition_level_encoding = parquet::Encoding::RLE;
-  header.repetition_level_encoding = parquet::Encoding::RLE;
-  current_page_->header.__set_data_page_header(header);
-  current_encoding_ = next_page_encoding_;
-  current_page_->finalized = false;
-  current_page_->num_non_null = 0;
-  page_stats_base_->Reset();
-}
-
-HdfsParquetTableWriter::HdfsParquetTableWriter(HdfsTableSink* parent, RuntimeState* state,
-    OutputPartition* output, const HdfsPartitionDescriptor* part_desc,
-    const HdfsTableDescriptor* table_desc)
-  : HdfsTableWriter(parent, state, output, part_desc, table_desc),
-    thrift_serializer_(new ThriftSerializer(true)),
-    current_row_group_(nullptr),
-    row_count_(0),
-    file_size_limit_(0),
-    reusable_col_mem_pool_(new MemPool(parent_->mem_tracker())),
-    per_file_mem_pool_(new MemPool(parent_->mem_tracker())),
-    row_idx_(0) {}
-
-HdfsParquetTableWriter::~HdfsParquetTableWriter() {
-}
-
-Status HdfsParquetTableWriter::Init() {
-  // Initialize file metadata
-  file_metadata_.version = PARQUET_CURRENT_VERSION;
-
-  stringstream created_by;
-  created_by << "impala version " << GetDaemonBuildVersion()
-             << " (build " << GetDaemonBuildHash() << ")";
-  file_metadata_.__set_created_by(created_by.str());
-
-  // Default to snappy compressed
-  THdfsCompression::type codec = THdfsCompression::SNAPPY;
-
-  const TQueryOptions& query_options = state_->query_options();
-  if (query_options.__isset.compression_codec) {
-    codec = query_options.compression_codec;
-  }
-  if (!(codec == THdfsCompression::NONE ||
-        codec == THdfsCompression::GZIP ||
-        codec == THdfsCompression::SNAPPY)) {
-    stringstream ss;
-    ss << "Invalid parquet compression codec " << Codec::GetCodecName(codec);
-    return Status(ss.str());
-  }
-
-  VLOG_FILE << "Using compression codec: " << codec;
-
-  int num_cols = table_desc_->num_cols() - table_desc_->num_clustering_cols();
-  // When opening files using the hdfsOpenFile() API, the maximum block size is limited to
-  // 2GB.
-  int64_t min_block_size = MinBlockSize(num_cols);
-  if (min_block_size >= numeric_limits<int32_t>::max()) {
-    stringstream ss;
-    return Status(Substitute("Minimum required block size must be less than 2GB "
-        "(currently $0), try reducing the number of non-partitioning columns in the "
-        "target table (currently $1).",
-        PrettyPrinter::Print(min_block_size, TUnit::BYTES), num_cols));
-  }
-
-  columns_.resize(num_cols);
-  // Initialize each column structure.
-  for (int i = 0; i < columns_.size(); ++i) {
-    BaseColumnWriter* writer = nullptr;
-    const ColumnType& type = output_expr_evals_[i]->root().type();
-    switch (type.type) {
-      case TYPE_BOOLEAN:
-        writer = new BoolColumnWriter(this, output_expr_evals_[i], codec);
-        break;
-      case TYPE_TINYINT:
-        writer = new ColumnWriter<int8_t>(this, output_expr_evals_[i], codec);
-        break;
-      case TYPE_SMALLINT:
-        writer = new ColumnWriter<int16_t>(this, output_expr_evals_[i], codec);
-        break;
-      case TYPE_INT:
-        writer = new ColumnWriter<int32_t>(this, output_expr_evals_[i], codec);
-        break;
-      case TYPE_BIGINT:
-        writer = new ColumnWriter<int64_t>(this, output_expr_evals_[i], codec);
-        break;
-      case TYPE_FLOAT:
-        writer = new ColumnWriter<float>(this, output_expr_evals_[i], codec);
-        break;
-      case TYPE_DOUBLE:
-        writer = new ColumnWriter<double>(this, output_expr_evals_[i], codec);
-        break;
-      case TYPE_TIMESTAMP:
-        writer = new ColumnWriter<TimestampValue>(
-            this, output_expr_evals_[i], codec);
-        break;
-      case TYPE_VARCHAR:
-      case TYPE_STRING:
-      case TYPE_CHAR:
-        writer = new ColumnWriter<StringValue>(this, output_expr_evals_[i], codec);
-        break;
-      case TYPE_DECIMAL:
-        switch (output_expr_evals_[i]->root().type().GetByteSize()) {
-          case 4:
-            writer = new ColumnWriter<Decimal4Value>(
-                this, output_expr_evals_[i], codec);
-            break;
-          case 8:
-            writer = new ColumnWriter<Decimal8Value>(
-                this, output_expr_evals_[i], codec);
-            break;
-          case 16:
-            writer = new ColumnWriter<Decimal16Value>(
-                this, output_expr_evals_[i], codec);
-            break;
-          default:
-            DCHECK(false);
-        }
-        break;
-      default:
-        DCHECK(false);
-    }
-    columns_[i].reset(writer);
-    RETURN_IF_ERROR(columns_[i]->Init());
-  }
-  RETURN_IF_ERROR(CreateSchema());
-  return Status::OK();
-}
-
-Status HdfsParquetTableWriter::CreateSchema() {
-  int num_clustering_cols = table_desc_->num_clustering_cols();
-
-  // Create flattened tree with a single root.
-  file_metadata_.schema.resize(columns_.size() + 1);
-  file_metadata_.schema[0].__set_num_children(columns_.size());
-  file_metadata_.schema[0].name = "schema";
-
-  for (int i = 0; i < columns_.size(); ++i) {
-    parquet::SchemaElement& node = file_metadata_.schema[i + 1];
-    const ColumnType& type = output_expr_evals_[i]->root().type();
-    node.name = table_desc_->col_descs()[i + num_clustering_cols].name();
-    node.__set_type(ConvertInternalToParquetType(type.type));
-    node.__set_repetition_type(parquet::FieldRepetitionType::OPTIONAL);
-    if (type.type == TYPE_DECIMAL) {
-      // This column is type decimal. Update the file metadata to include the
-      // additional fields:
-      //  1) converted_type: indicate this is really a decimal column.
-      //  2) type_length: the number of bytes used per decimal value in the data
-      //  3) precision/scale
-      node.__set_converted_type(parquet::ConvertedType::DECIMAL);
-      node.__set_type_length(
-          ParquetPlainEncoder::DecimalSize(output_expr_evals_[i]->root().type()));
-      node.__set_scale(output_expr_evals_[i]->root().type().scale);
-      node.__set_precision(output_expr_evals_[i]->root().type().precision);
-    } else if (type.type == TYPE_VARCHAR || type.type == TYPE_CHAR ||
-        (type.type == TYPE_STRING &&
-         state_->query_options().parquet_annotate_strings_utf8)) {
-      node.__set_converted_type(parquet::ConvertedType::UTF8);
-    } else if (type.type == TYPE_TINYINT) {
-      node.__set_converted_type(parquet::ConvertedType::INT_8);
-    } else if (type.type == TYPE_SMALLINT) {
-      node.__set_converted_type(parquet::ConvertedType::INT_16);
-    } else if (type.type == TYPE_INT) {
-      node.__set_converted_type(parquet::ConvertedType::INT_32);
-    } else if (type.type == TYPE_BIGINT) {
-      node.__set_converted_type(parquet::ConvertedType::INT_64);
-    }
-  }
-
-  return Status::OK();
-}
-
-Status HdfsParquetTableWriter::AddRowGroup() {
-  if (current_row_group_ != nullptr) RETURN_IF_ERROR(FlushCurrentRowGroup());
-  file_metadata_.row_groups.push_back(parquet::RowGroup());
-  current_row_group_ = &file_metadata_.row_groups[file_metadata_.row_groups.size() - 1];
-
-  // Initialize new row group metadata.
-  int num_clustering_cols = table_desc_->num_clustering_cols();
-  current_row_group_->columns.resize(columns_.size());
-  for (int i = 0; i < columns_.size(); ++i) {
-    parquet::ColumnMetaData metadata;
-    metadata.type = ConvertInternalToParquetType(columns_[i]->type().type);
-    metadata.path_in_schema.push_back(
-        table_desc_->col_descs()[i + num_clustering_cols].name());
-    metadata.codec = columns_[i]->GetParquetCodec();
-    current_row_group_->columns[i].__set_meta_data(metadata);
-  }
-
-  return Status::OK();
-}
-
-int64_t HdfsParquetTableWriter::MinBlockSize(int64_t num_file_cols) const {
-  // See file_size_limit_ calculation in InitNewFile().
-  return 3 * DEFAULT_DATA_PAGE_SIZE * num_file_cols;
-}
-
-uint64_t HdfsParquetTableWriter::default_block_size() const {
-  int64_t block_size;
-  if (state_->query_options().__isset.parquet_file_size &&
-      state_->query_options().parquet_file_size > 0) {
-    // If the user specified a value explicitly, use it. InitNewFile() will verify that
-    // the actual file's block size is sufficient.
-    block_size = state_->query_options().parquet_file_size;
-  } else {
-    block_size = HDFS_BLOCK_SIZE;
-    // Blocks are usually HDFS_BLOCK_SIZE bytes, unless there are many columns, in
-    // which case a per-column minimum kicks in.
-    block_size = max(block_size, MinBlockSize(columns_.size()));
-  }
-  // HDFS does not like block sizes that are not aligned
-  return BitUtil::RoundUp(block_size, HDFS_BLOCK_ALIGNMENT);
-}
-
-Status HdfsParquetTableWriter::InitNewFile() {
-  DCHECK(current_row_group_ == nullptr);
-
-  per_file_mem_pool_->Clear();
-
-  // Get the file limit
-  file_size_limit_ = output_->block_size;
-  if (file_size_limit_ < HDFS_MIN_FILE_SIZE) {
-    stringstream ss;
-    ss << "Hdfs file size (" << file_size_limit_ << ") is too small.";
-    return Status(ss.str());
-  }
-
-  // We want to output HDFS files that are no more than file_size_limit_.  If we
-  // go over the limit, HDFS will split the file into multiple blocks which
-  // is undesirable.  If we are under the limit, we potentially end up with more
-  // files than necessary.  Either way, it is not going to generate a invalid
-  // file.
-  // With arbitrary encoding schemes, it is not possible to know if appending
-  // a new row will push us over the limit until after encoding it.  Rolling back
-  // a row can be tricky as well so instead we will stop the file when it is
-  // 2 * DEFAULT_DATA_PAGE_SIZE * num_cols short of the limit. e.g. 50 cols with 8K data
-  // pages, means we stop 800KB shy of the limit.
-  // Data pages calculate their size precisely when they are complete so having
-  // a two page buffer guarantees we will never go over (unless there are huge values
-  // that require increasing the page size).
-  // TODO: this should be made dynamic based on the size of rows seen so far.
-  // This would for example, let us account for very long string columns.
-  const int64_t num_cols = columns_.size();
-  if (file_size_limit_ < MinBlockSize(num_cols)) {
-    stringstream ss;
-    ss << "Parquet file size " << file_size_limit_ << " bytes is too small for "
-       << "a table with " << num_cols << " non-partitioning columns. Set query option "
-       << "PARQUET_FILE_SIZE to at least " << MinBlockSize(num_cols) << ".";
-    return Status(ss.str());
-  }
-  file_size_limit_ -= 2 * DEFAULT_DATA_PAGE_SIZE * columns_.size();
-  DCHECK_GE(file_size_limit_,
-      static_cast<int64_t>(DEFAULT_DATA_PAGE_SIZE * columns_.size()));
-  file_pos_ = 0;
-  row_count_ = 0;
-  file_size_estimate_ = 0;
-
-  file_metadata_.row_groups.clear();
-  RETURN_IF_ERROR(AddRowGroup());
-  RETURN_IF_ERROR(WriteFileHeader());
-
-  return Status::OK();
-}
-
-Status HdfsParquetTableWriter::AppendRows(
-    RowBatch* batch, const vector<int32_t>& row_group_indices, bool* new_file) {
-  SCOPED_TIMER(parent_->encode_timer());
-  *new_file = false;
-  int limit;
-  if (row_group_indices.empty()) {
-    limit = batch->num_rows();
-  } else {
-    limit = row_group_indices.size();
-  }
-
-  bool all_rows = row_group_indices.empty();
-  for (; row_idx_ < limit;) {
-    TupleRow* current_row = all_rows ?
-        batch->GetRow(row_idx_) : batch->GetRow(row_group_indices[row_idx_]);
-    for (int j = 0; j < columns_.size(); ++j) {
-      RETURN_IF_ERROR(columns_[j]->AppendRow(current_row));
-    }
-    ++row_idx_;
-    ++row_count_;
-    ++output_->num_rows;
-
-    if (file_size_estimate_ > file_size_limit_) {
-      // This file is full.  We need a new file.
-      *new_file = true;
-      return Status::OK();
-    }
-  }
-
-  // We exhausted the batch, so we materialize the statistics before releasing the memory.
-  for (unique_ptr<BaseColumnWriter>& column : columns_) {
-    RETURN_IF_ERROR(column->MaterializeStatsValues());
-  }
-
-  // Reset the row_idx_ when we exhaust the batch.  We can exit before exhausting
-  // the batch if we run out of file space and will continue from the last index.
-  row_idx_ = 0;
-  return Status::OK();
-}
-
-Status HdfsParquetTableWriter::Finalize() {
-  SCOPED_TIMER(parent_->hdfs_write_timer());
-
-  // At this point we write out the rest of the file.  We first update the file
-  // metadata, now that all the values have been seen.
-  file_metadata_.num_rows = row_count_;
-
-  // Set the ordering used to write parquet statistics for columns in the file.
-  parquet::ColumnOrder col_order = parquet::ColumnOrder();
-  col_order.__set_TYPE_ORDER(parquet::TypeDefinedOrder());
-  file_metadata_.column_orders.assign(columns_.size(), col_order);
-  file_metadata_.__isset.column_orders = true;
-
-  RETURN_IF_ERROR(FlushCurrentRowGroup());
-  RETURN_IF_ERROR(WritePageIndex());
-  for (auto& column : columns_) column->Reset();
-  RETURN_IF_ERROR(WriteFileFooter());
-  *stats_.mutable_parquet_stats() = parquet_dml_stats_;
-  COUNTER_ADD(parent_->rows_inserted_counter(), row_count_);
-  return Status::OK();
-}
-
-void HdfsParquetTableWriter::Close() {
-  // Release all accumulated memory
-  for (int i = 0; i < columns_.size(); ++i) {
-    columns_[i]->Close();
-  }
-  reusable_col_mem_pool_->FreeAll();
-  per_file_mem_pool_->FreeAll();
-  compression_staging_buffer_.clear();
-}
-
-Status HdfsParquetTableWriter::WriteFileHeader() {
-  DCHECK_EQ(file_pos_, 0);
-  RETURN_IF_ERROR(Write(PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)));
-  file_pos_ += sizeof(PARQUET_VERSION_NUMBER);
-  file_size_estimate_ += sizeof(PARQUET_VERSION_NUMBER);
-  return Status::OK();
-}
-
-Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
-  if (current_row_group_ == nullptr) return Status::OK();
-
-  int num_clustering_cols = table_desc_->num_clustering_cols();
-  for (int i = 0; i < columns_.size(); ++i) {
-    int64_t data_page_offset, dict_page_offset;
-    // Flush this column.  This updates the final metadata sizes for this column.
-    RETURN_IF_ERROR(columns_[i]->Flush(&file_pos_, &data_page_offset, &dict_page_offset));
-    DCHECK_GT(data_page_offset, 0);
-
-    parquet::ColumnChunk& col_chunk = current_row_group_->columns[i];
-    parquet::ColumnMetaData& col_metadata = col_chunk.meta_data;
-    col_metadata.data_page_offset = data_page_offset;
-    if (dict_page_offset >= 0) {
-      col_metadata.__set_dictionary_page_offset(dict_page_offset);
-    }
-
-    BaseColumnWriter* col_writer = columns_[i].get();
-    col_metadata.num_values = col_writer->num_values();
-    col_metadata.total_uncompressed_size = col_writer->total_uncompressed_size();
-    col_metadata.total_compressed_size = col_writer->total_compressed_size();
-    current_row_group_->total_byte_size += col_writer->total_compressed_size();
-    current_row_group_->num_rows = col_writer->num_values();
-    current_row_group_->columns[i].file_offset = file_pos_;
-    const string& col_name = table_desc_->col_descs()[i + num_clustering_cols].name();
-    google::protobuf::Map<string,int64>* column_size_map =
-        parquet_dml_stats_.mutable_per_column_size();
-    (*column_size_map)[col_name] += col_writer->total_compressed_size();
-
-    // Write encodings and encoding stats for this column
-    col_metadata.encodings.clear();
-    for (parquet::Encoding::type encoding : col_writer->column_encodings_) {
-      col_metadata.encodings.push_back(encoding);
-    }
-
-    vector<parquet::PageEncodingStats> encoding_stats;
-    // Add dictionary page encoding stats
-    for (const auto& entry: col_writer->dict_encoding_stats_) {
-      parquet::PageEncodingStats dict_enc_stat;
-      dict_enc_stat.page_type = parquet::PageType::DICTIONARY_PAGE;
-      dict_enc_stat.encoding = entry.first;
-      dict_enc_stat.count = entry.second;
-      encoding_stats.push_back(dict_enc_stat);
-    }
-    // Add data page encoding stats
-    for (const auto& entry: col_writer->data_encoding_stats_) {
-      parquet::PageEncodingStats data_enc_stat;
-      data_enc_stat.page_type = parquet::PageType::DATA_PAGE;
-      data_enc_stat.encoding = entry.first;
-      data_enc_stat.count = entry.second;
-      encoding_stats.push_back(data_enc_stat);
-    }
-    col_metadata.__set_encoding_stats(encoding_stats);
-
-    // Build column statistics and add them to the header.
-    col_writer->EncodeRowGroupStats(&current_row_group_->columns[i].meta_data);
-
-    // Since we don't supported complex schemas, all columns should have the same
-    // number of values.
-    DCHECK_EQ(current_row_group_->columns[0].meta_data.num_values,
-        col_writer->num_values());
-
-    // Metadata for this column is complete, write it out to file.  The column metadata
-    // goes at the end so that when we have collocated files, the column data can be
-    // written without buffering.
-    uint8_t* buffer = nullptr;
-    uint32_t len = 0;
-    RETURN_IF_ERROR(thrift_serializer_->SerializeToBuffer(
-        &current_row_group_->columns[i], &len, &buffer));
-    RETURN_IF_ERROR(Write(buffer, len));
-    file_pos_ += len;
-  }
-
-  // Populate RowGroup::sorting_columns with all columns specified by the Frontend.
-  for (int col_idx : parent_->sort_columns()) {
-    current_row_group_->sorting_columns.push_back(parquet::SortingColumn());
-    parquet::SortingColumn& sorting_column = current_row_group_->sorting_columns.back();
-    sorting_column.column_idx = col_idx;
-    sorting_column.descending = false;
-    sorting_column.nulls_first = false;
-  }
-  current_row_group_->__isset.sorting_columns =
-      !current_row_group_->sorting_columns.empty();
-
-  current_row_group_ = nullptr;
-  return Status::OK();
-}
-
-Status HdfsParquetTableWriter::WritePageIndex() {
-  if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK();
-
-  // Currently Impala only write Parquet files with a single row group. The current
-  // page index logic depends on this behavior as it only keeps one row group's
-  // statistics in memory.
-  DCHECK_EQ(file_metadata_.row_groups.size(), 1);
-
-  parquet::RowGroup* row_group = &(file_metadata_.row_groups[0]);
-  // Write out the column indexes.
-  for (int i = 0; i < columns_.size(); ++i) {
-    auto& column = *columns_[i];
-    if (!column.valid_column_index_) continue;
-    column.column_index_.__set_boundary_order(
-        column.row_group_stats_base_->GetBoundaryOrder());
-    // We always set null_counts.
-    column.column_index_.__isset.null_counts = true;
-    uint8_t* buffer = nullptr;
-    uint32_t len = 0;
-    RETURN_IF_ERROR(thrift_serializer_->SerializeToBuffer(
-        &column.column_index_, &len, &buffer));
-    RETURN_IF_ERROR(Write(buffer, len));
-    // Update the column_index_offset and column_index_length of the ColumnChunk
-    row_group->columns[i].__set_column_index_offset(file_pos_);
-    row_group->columns[i].__set_column_index_length(len);
-    file_pos_ += len;
-  }
-  // Write out the offset indexes.
-  for (int i = 0; i < columns_.size(); ++i) {
-    auto& column = *columns_[i];
-    uint8_t* buffer = nullptr;
-    uint32_t len = 0;
-    RETURN_IF_ERROR(thrift_serializer_->SerializeToBuffer(
-        &column.offset_index_, &len, &buffer));
-    RETURN_IF_ERROR(Write(buffer, len));
-    // Update the offset_index_offset and offset_index_length of the ColumnChunk
-    row_group->columns[i].__set_offset_index_offset(file_pos_);
-    row_group->columns[i].__set_offset_index_length(len);
-    file_pos_ += len;
-  }
-  return Status::OK();
-}
-
-Status HdfsParquetTableWriter::WriteFileFooter() {
-  // Write file_meta_data
-  uint32_t file_metadata_len = 0;
-  uint8_t* buffer = nullptr;
-  RETURN_IF_ERROR(thrift_serializer_->SerializeToBuffer(
-      &file_metadata_, &file_metadata_len, &buffer));
-  RETURN_IF_ERROR(Write(buffer, file_metadata_len));
-
-  // Write footer
-  RETURN_IF_ERROR(Write<uint32_t>(file_metadata_len));
-  RETURN_IF_ERROR(Write(PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)));
-  return Status::OK();
-}