You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/04 01:07:46 UTC
[4/6] impala git commit: IMPALA-7190: Remove unsupported format
writer support
IMPALA-7190: Remove unsupported format writer support
This patch removes write support for unsupported formats like Sequence,
Avro and compressed text. Also, the related query options
ALLOW_UNSUPPORTED_FORMATS and SEQ_COMPRESSION_MODE have been migrated
to the REMOVED query options type.
Testing:
Ran exhaustive build.
Change-Id: I821dc7495a901f1658daa500daf3791b386c7185
Reviewed-on: http://gerrit.cloudera.org:8080/10823
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/30e82c63
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/30e82c63
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/30e82c63
Branch: refs/heads/master
Commit: 30e82c63ecdd56ded10fed931d95ab6d994b9244
Parents: 6f52ce1
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Mon Jun 25 18:11:08 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 3 20:34:27 2018 +0000
----------------------------------------------------------------------
be/src/exec/CMakeLists.txt | 2 -
be/src/exec/hdfs-avro-table-writer.cc | 295 ---------------
be/src/exec/hdfs-avro-table-writer.h | 121 -------
be/src/exec/hdfs-sequence-table-writer.cc | 361 -------------------
be/src/exec/hdfs-sequence-table-writer.h | 194 ----------
be/src/exec/hdfs-table-sink.cc | 48 +--
be/src/exec/hdfs-text-table-writer.cc | 61 +---
be/src/exec/hdfs-text-table-writer.h | 9 -
be/src/service/query-options-test.cc | 2 -
be/src/service/query-options.cc | 16 -
be/src/service/query-options.h | 5 +-
common/thrift/ImpalaInternalService.thrift | 6 -
common/thrift/ImpalaService.thrift | 6 +-
.../apache/impala/planner/PlannerTestBase.java | 1 -
testdata/bad_avro_snap/README | 4 +-
.../queries/QueryTest/avro-writer.test | 43 ---
.../queries/QueryTest/seq-writer.test | 308 ----------------
.../functional-query/queries/QueryTest/set.test | 3 -
.../queries/QueryTest/text-writer.test | 47 ---
.../queries/QueryTest/unsupported-writers.test | 77 ++++
tests/common/test_dimensions.py | 13 -
tests/hs2/test_hs2.py | 8 +-
tests/metadata/test_partition_metadata.py | 26 +-
tests/query_test/test_compressed_formats.py | 62 +---
tests/shell/test_shell_interactive.py | 10 +-
25 files changed, 121 insertions(+), 1607 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 1753cb0..4544b95 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -56,12 +56,10 @@ add_library(Exec
hdfs-rcfile-scanner.cc
hdfs-sequence-scanner.cc
hdfs-avro-scanner.cc
- hdfs-avro-table-writer.cc
hdfs-avro-scanner-ir.cc
hdfs-plugin-text-scanner.cc
hdfs-text-scanner.cc
hdfs-text-table-writer.cc
- hdfs-sequence-table-writer.cc
hdfs-parquet-scanner.cc
hdfs-parquet-scanner-ir.cc
hdfs-parquet-table-writer.cc
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/hdfs-avro-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-table-writer.cc b/be/src/exec/hdfs-avro-table-writer.cc
deleted file mode 100644
index 3ce296d..0000000
--- a/be/src/exec/hdfs-avro-table-writer.cc
+++ /dev/null
@@ -1,295 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/hdfs-avro-table-writer.h"
-
-#include <vector>
-#include <hdfs.h>
-#include <boost/scoped_ptr.hpp>
-#include <stdlib.h>
-#include <gutil/strings/substitute.h>
-
-#include "exec/exec-node.h"
-#include "exec/hdfs-table-sink.h"
-#include "util/compress.h"
-#include "util/hdfs-util.h"
-#include "util/uid-util.h"
-#include "exprs/scalar-expr.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "runtime/mem-pool.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/raw-value.h"
-#include "runtime/row-batch.h"
-#include "runtime/runtime-state.h"
-#include "runtime/hdfs-fs-cache.h"
-#include "runtime/types.h"
-#include "util/runtime-profile-counters.h"
-#include "write-stream.inline.h"
-
-#include "common/names.h"
-
-using namespace strings;
-using namespace impala;
-
-const uint8_t OBJ1[4] = {'O', 'b', 'j', 1};
-const char* AVRO_SCHEMA_STR = "avro.schema";
-const char* AVRO_CODEC_STR = "avro.codec";
-const THdfsCompression::type AVRO_DEFAULT_CODEC = THdfsCompression::SNAPPY;
-// Desired size of each Avro block (bytes); actual block size will vary +/- the
-// size of a row. This is approximate size of the block before compression.
-const int DEFAULT_AVRO_BLOCK_SIZE = 64 * 1024;
-
-HdfsAvroTableWriter::HdfsAvroTableWriter(HdfsTableSink* parent,
- RuntimeState* state, OutputPartition* output,
- const HdfsPartitionDescriptor* partition, const HdfsTableDescriptor* table_desc)
- : HdfsTableWriter(parent, state, output, partition, table_desc),
- unflushed_rows_(0) {
- mem_pool_.reset(new MemPool(parent->mem_tracker()));
-}
-
-void HdfsAvroTableWriter::ConsumeRow(TupleRow* row) {
- ++unflushed_rows_;
- int num_non_partition_cols =
- table_desc_->num_cols() - table_desc_->num_clustering_cols();
- for (int j = 0; j < num_non_partition_cols; ++j) {
- void* value = output_expr_evals_[j]->GetValue(row);
- AppendField(output_expr_evals_[j]->root().type(), value);
- }
-}
-
-inline void HdfsAvroTableWriter::AppendField(const ColumnType& type, const void* value) {
- // Each avro field is written as union, which is a ZLong indicating the union
- // field followed by the encoded value. Impala/Hive always stores values as
- // a union of [ColumnType, NULL].
- // TODO: the writer may be asked to write [NULL, ColumnType] unions. It is wrong
- // for us to assume [ColumnType, NULL].
-
- if (value == NULL) {
- // indicate the second field of the union
- out_.WriteZLong(1);
- // No bytes are written for a null value.
- return;
- }
-
- // indicate that we are using the first field of the union
- out_.WriteZLong(0);
-
- switch (type.type) {
- case TYPE_BOOLEAN:
- out_.WriteByte(*reinterpret_cast<const char*>(value));
- break;
- case TYPE_TINYINT:
- out_.WriteZInt(*reinterpret_cast<const int8_t*>(value));
- break;
- case TYPE_SMALLINT:
- out_.WriteZInt(*reinterpret_cast<const int16_t*>(value));
- break;
- case TYPE_INT:
- out_.WriteZInt(*reinterpret_cast<const int32_t*>(value));
- break;
- case TYPE_BIGINT:
- out_.WriteZLong(*reinterpret_cast<const int64_t*>(value));
- break;
- case TYPE_FLOAT:
- out_.WriteBytes(4, reinterpret_cast<const char*>(value));
- break;
- case TYPE_DOUBLE:
- out_.WriteBytes(8, reinterpret_cast<const char*>(value));
- break;
- case TYPE_STRING: {
- const StringValue& sv = *reinterpret_cast<const StringValue*>(value);
- out_.WriteZLong(sv.len);
- out_.WriteBytes(sv.len, sv.ptr);
- break;
- }
- case TYPE_DECIMAL: {
- int byte_size = ColumnType::GetDecimalByteSize(type.precision);
- out_.WriteZLong(byte_size);
-#if __BYTE_ORDER == __LITTLE_ENDIAN
- char tmp[16];
- BitUtil::ByteSwap(tmp, value, byte_size);
- out_.WriteBytes(byte_size, tmp);
-#else
- out_.WriteBytes(byte_size, reinterpret_cast<const char*>(value));
-#endif
- break;
- }
- case TYPE_TIMESTAMP:
- case TYPE_BINARY:
- case INVALID_TYPE:
- case TYPE_NULL:
- case TYPE_DATE:
- case TYPE_DATETIME:
- default:
- DCHECK(false);
- }
-}
-
-Status HdfsAvroTableWriter::Init() {
- // create the Sync marker
- sync_marker_ = GenerateUUIDString();
-
- THdfsCompression::type codec = AVRO_DEFAULT_CODEC;
- if (state_->query_options().__isset.compression_codec) {
- codec = state_->query_options().compression_codec;
- }
-
- // sets codec_name_ and compressor_
- codec_type_ = codec;
- switch (codec) {
- case THdfsCompression::SNAPPY:
- codec_name_ = "snappy";
- break;
- case THdfsCompression::DEFLATE:
- codec_name_ = "deflate";
- break;
- case THdfsCompression::NONE:
- codec_name_ = "null";
- return Status::OK();
- default:
- const char* name = _THdfsCompression_VALUES_TO_NAMES.find(codec)->second;
- return Status(Substitute(
- "Avro only supports NONE, DEFLATE, and SNAPPY codecs; unsupported codec $0",
- name));
- }
- RETURN_IF_ERROR(Codec::CreateCompressor(mem_pool_.get(), true, codec, &compressor_));
- DCHECK(compressor_.get() != NULL);
-
- return Status::OK();
-}
-
-void HdfsAvroTableWriter::Close() {
- mem_pool_->FreeAll();
-}
-
-Status HdfsAvroTableWriter::AppendRows(
- RowBatch* batch, const vector<int32_t>& row_group_indices, bool* new_file) {
- int32_t limit;
- bool all_rows = row_group_indices.empty();
- if (all_rows) {
- limit = batch->num_rows();
- } else {
- limit = row_group_indices.size();
- }
- COUNTER_ADD(parent_->rows_inserted_counter(), limit);
-
- {
- SCOPED_TIMER(parent_->encode_timer());
- for (int row_idx = 0; row_idx < limit; ++row_idx) {
- TupleRow* row = all_rows ?
- batch->GetRow(row_idx) : batch->GetRow(row_group_indices[row_idx]);
- ConsumeRow(row);
- }
- }
-
- if (out_.Size() > DEFAULT_AVRO_BLOCK_SIZE) RETURN_IF_ERROR(Flush());
- *new_file = false;
- return Status::OK();
-}
-
-Status HdfsAvroTableWriter::WriteFileHeader() {
- out_.Clear();
- out_.WriteBytes(4, reinterpret_cast<const uint8_t*>(OBJ1));
-
- // Write 'File Metadata' as an encoded avro map
- // number of key/value pairs in the map
- out_.WriteZLong(2);
-
- // Schema information
- out_.WriteZLong(strlen(AVRO_SCHEMA_STR));
- out_.WriteBytes(strlen(AVRO_SCHEMA_STR), AVRO_SCHEMA_STR);
- const string& avro_schema = table_desc_->avro_schema();
- out_.WriteZLong(avro_schema.size());
- out_.WriteBytes(avro_schema.size(), avro_schema.data());
-
- // codec information
- out_.WriteZLong(strlen(AVRO_CODEC_STR));
- out_.WriteBytes(strlen(AVRO_CODEC_STR), AVRO_CODEC_STR);
- out_.WriteZLong(codec_name_.size());
- out_.WriteBytes(codec_name_.size(), codec_name_.data());
-
- // Write end of map marker
- out_.WriteZLong(0);
-
- out_.WriteBytes(sync_marker_.size(), sync_marker_.data());
-
- const string& text = out_.String();
- RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(text.c_str()),
- text.size()));
- out_.Clear();
- return Status::OK();
-}
-
-Status HdfsAvroTableWriter::Flush() {
- if (unflushed_rows_ == 0) return Status::OK();
-
- WriteStream header;
- // 1. Count of objects in this block
- header.WriteZLong(unflushed_rows_);
-
- const uint8_t* output;
- int64_t output_length;
- // Snappy format requires a CRC after the compressed data
- uint32_t crc;
- const string& text = out_.String();
-
- if (codec_type_ != THdfsCompression::NONE) {
- SCOPED_TIMER(parent_->compress_timer());
- uint8_t* temp;
- RETURN_IF_ERROR(compressor_->ProcessBlock(false, text.size(),
- reinterpret_cast<const uint8_t*>(text.data()), &output_length, &temp));
- output = temp;
- if (codec_type_ == THdfsCompression::SNAPPY) {
- crc = SnappyCompressor::ComputeChecksum(
- text.size(), reinterpret_cast<const uint8_t*>(text.data()));
- }
- } else {
- output = reinterpret_cast<const uint8_t*>(text.data());
- output_length = out_.Size();
- }
-
- // 2. length of serialized objects
- if (codec_type_ == THdfsCompression::SNAPPY) {
- // + 4 for the CRC checksum at the end of the compressed block
- header.WriteZLong(output_length + 4);
- } else {
- header.WriteZLong(output_length);
- }
-
- const string& head = header.String();
- {
- SCOPED_TIMER(parent_->hdfs_write_timer());
- // Flush (1) and (2) to HDFS
- RETURN_IF_ERROR(
- Write(reinterpret_cast<const uint8_t*>(head.data()), head.size()));
- // 3. serialized objects
- RETURN_IF_ERROR(Write(output, output_length));
-
- // Write CRC checksum
- if (codec_type_ == THdfsCompression::SNAPPY) {
- RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(&crc), sizeof(uint32_t)));
- }
- }
-
- // 4. sync marker
- RETURN_IF_ERROR(
- Write(reinterpret_cast<const uint8_t*>(sync_marker_.data()), sync_marker_.size()));
-
- out_.Clear();
- unflushed_rows_ = 0;
- return Status::OK();
-}
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/hdfs-avro-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-table-writer.h b/be/src/exec/hdfs-avro-table-writer.h
deleted file mode 100644
index 6966860..0000000
--- a/be/src/exec/hdfs-avro-table-writer.h
+++ /dev/null
@@ -1,121 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_EXEC_HDFS_AVRO_WRITER_H
-#define IMPALA_EXEC_HDFS_AVRO_WRITER_H
-
-#include <hdfs.h>
-#include <sstream>
-#include <string>
-
-#include "common/status.h"
-#include "exec/hdfs-table-writer.h"
-#include "runtime/mem-pool.h"
-#include "util/codec.h"
-#include "exec/write-stream.h"
-
-namespace impala {
-
-struct ColumnType;
-class HdfsTableSink;
-class RuntimeState;
-class ScalarExprEvaluator;
-class TupleDescriptor;
-class TupleRow;
-struct OutputPartition;
-struct StringValue;
-
-/// Consumes rows and outputs the rows into an Avro file in HDFS
-/// Each Avro file contains a block of records (rows). The file metadata specifies the
-/// schema of the records in addition to the name of the codec, if any, used to compress
-/// blocks. The structure is:
-/// [ Metadata ]
-/// [ Sync Marker ]
-/// [ Data Block ]
-/// ...
-/// [ Data Block ]
-//
-/// Each Data Block consists of:
-/// [ Number of Rows in Block ]
-/// [ Size of serialized objects, after compression ]
-/// [ Serialized objects, compressed ]
-/// [ Sync Marker ]
-//
-/// If compression is used, each block is compressed individually. The block size defaults
-/// to about 64KB before compression.
-/// This writer implements the Avro 1.7.7 spec:
-/// http://avro.apache.org/docs/1.7.7/spec.html
-class HdfsAvroTableWriter : public HdfsTableWriter {
- public:
- HdfsAvroTableWriter(HdfsTableSink* parent,
- RuntimeState* state, OutputPartition* output,
- const HdfsPartitionDescriptor* partition,
- const HdfsTableDescriptor* table_desc);
-
- virtual ~HdfsAvroTableWriter() { }
-
- virtual Status Init() override;
- virtual Status Finalize() override { return Flush(); }
- virtual Status InitNewFile() override { return WriteFileHeader(); }
- virtual void Close() override;
- virtual uint64_t default_block_size() const override { return 0; }
- virtual std::string file_extension() const override { return "avro"; }
-
- /// Outputs the given rows into an HDFS sequence file. The rows are buffered
- /// to fill a sequence file block.
- virtual Status AppendRows(RowBatch* rows,
- const std::vector<int32_t>& row_group_indices, bool* new_file) override;
-
- private:
- /// Processes a single row, appending to out_
- void ConsumeRow(TupleRow* row);
-
- /// Adds an encoded field to out_
- inline void AppendField(const ColumnType& type, const void* value);
-
- /// Writes the Avro file header to HDFS
- Status WriteFileHeader() WARN_UNUSED_RESULT;
-
- /// Writes the contents of out_ to HDFS as a single Avro file block.
- /// Returns an error if write to HDFS fails.
- Status Flush() WARN_UNUSED_RESULT;
-
- /// Buffer which holds accumulated output
- WriteStream out_;
-
- /// Memory pool used by codec to allocate output buffer.
- /// Owned by this class. Initialized using parent's memtracker.
- boost::scoped_ptr<MemPool> mem_pool_;
-
- /// Number of rows consumed since last flush
- uint64_t unflushed_rows_;
-
- /// Name of codec, only set if codec_type_ != NONE
- std::string codec_name_;
-
- /// Type of the codec, will be NONE if no compression is used
- THdfsCompression::type codec_type_;
-
- /// The codec for compressing, only set if codec_type_ != NONE
- boost::scoped_ptr<Codec> compressor_;
-
- /// 16 byte sync marker (a uuid)
- std::string sync_marker_;
-};
-
-} // namespace impala
-#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/hdfs-sequence-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-table-writer.cc b/be/src/exec/hdfs-sequence-table-writer.cc
deleted file mode 100644
index 42a70f0..0000000
--- a/be/src/exec/hdfs-sequence-table-writer.cc
+++ /dev/null
@@ -1,361 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/hdfs-sequence-table-writer.h"
-#include "exec/write-stream.inline.h"
-#include "exec/exec-node.h"
-#include "util/hdfs-util.h"
-#include "util/uid-util.h"
-#include "exprs/scalar-expr.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/raw-value.h"
-#include "runtime/row-batch.h"
-#include "runtime/runtime-state.h"
-#include "runtime/hdfs-fs-cache.h"
-#include "util/runtime-profile-counters.h"
-
-#include <vector>
-#include <hdfs.h>
-#include <boost/scoped_ptr.hpp>
-#include <stdlib.h>
-
-#include "common/names.h"
-
-namespace impala {
-
-const uint8_t HdfsSequenceTableWriter::SEQ6_CODE[4] = {'S', 'E', 'Q', 6};
-const char* HdfsSequenceTableWriter::VALUE_CLASS_NAME = "org.apache.hadoop.io.Text";
-const char* HdfsSequenceTableWriter::KEY_CLASS_NAME =
- "org.apache.hadoop.io.BytesWritable";
-
-HdfsSequenceTableWriter::HdfsSequenceTableWriter(HdfsTableSink* parent,
- RuntimeState* state, OutputPartition* output,
- const HdfsPartitionDescriptor* partition, const HdfsTableDescriptor* table_desc)
- : HdfsTableWriter(parent, state, output, partition, table_desc),
- mem_pool_(new MemPool(parent->mem_tracker())), compress_flag_(false),
- unflushed_rows_(0), record_compression_(false) {
- approx_block_size_ = 64 * 1024 * 1024;
- parent->mem_tracker()->Consume(approx_block_size_);
- field_delim_ = partition->field_delim();
- escape_char_ = partition->escape_char();
-}
-
-Status HdfsSequenceTableWriter::Init() {
- THdfsCompression::type codec = THdfsCompression::SNAPPY_BLOCKED;
- const TQueryOptions& query_options = state_->query_options();
- if (query_options.__isset.compression_codec) {
- codec = query_options.compression_codec;
- if (codec == THdfsCompression::SNAPPY) {
- // Seq file (and in general things that use hadoop.io.codec) always
- // mean snappy_blocked.
- codec = THdfsCompression::SNAPPY_BLOCKED;
- }
- }
- if (codec != THdfsCompression::NONE) {
- compress_flag_ = true;
- if (query_options.__isset.seq_compression_mode) {
- record_compression_ =
- query_options.seq_compression_mode == THdfsSeqCompressionMode::RECORD;
- }
- RETURN_IF_ERROR(Codec::GetHadoopCodecClassName(codec, &codec_name_));
- RETURN_IF_ERROR(Codec::CreateCompressor(
- mem_pool_.get(), true, codec_name_, &compressor_));
- DCHECK(compressor_.get() != NULL);
- }
-
- // create the Sync marker
- string uuid = GenerateUUIDString();
- uint8_t sync_neg1[20];
-
- ReadWriteUtil::PutInt(sync_neg1, static_cast<uint32_t>(-1));
- DCHECK(uuid.size() == 16);
- memcpy(sync_neg1 + sizeof(int32_t), uuid.data(), uuid.size());
- neg1_sync_marker_ = string(reinterpret_cast<char*>(sync_neg1), 20);
- sync_marker_ = uuid;
-
- return Status::OK();
-}
-
-Status HdfsSequenceTableWriter::AppendRows(
- RowBatch* batch, const vector<int32_t>& row_group_indices, bool* new_file) {
- int32_t limit;
- if (row_group_indices.empty()) {
- limit = batch->num_rows();
- } else {
- limit = row_group_indices.size();
- }
- COUNTER_ADD(parent_->rows_inserted_counter(), limit);
-
- bool all_rows = row_group_indices.empty();
- int num_non_partition_cols =
- table_desc_->num_cols() - table_desc_->num_clustering_cols();
- DCHECK_GE(output_expr_evals_.size(), num_non_partition_cols)
- << parent_->DebugString();
-
- {
- SCOPED_TIMER(parent_->encode_timer());
- if (all_rows) {
- for (int row_idx = 0; row_idx < limit; ++row_idx) {
- RETURN_IF_ERROR(ConsumeRow(batch->GetRow(row_idx)));
- }
- } else {
- for (int row_idx = 0; row_idx < limit; ++row_idx) {
- TupleRow* row = batch->GetRow(row_group_indices[row_idx]);
- RETURN_IF_ERROR(ConsumeRow(row));
- }
- }
- }
-
- if (!compress_flag_) {
- out_.WriteBytes(neg1_sync_marker_.size(), neg1_sync_marker_.data());
- }
-
- if (out_.Size() >= approx_block_size_) RETURN_IF_ERROR(Flush());
- *new_file = false;
- return Status::OK();
-}
-
-Status HdfsSequenceTableWriter::WriteFileHeader() {
- out_.WriteBytes(sizeof(SEQ6_CODE), SEQ6_CODE);
-
- // Setup to be correct key class
- out_.WriteText(strlen(KEY_CLASS_NAME),
- reinterpret_cast<const uint8_t*>(KEY_CLASS_NAME));
-
- // Setup to be correct value class
- out_.WriteText(strlen(VALUE_CLASS_NAME),
- reinterpret_cast<const uint8_t*>(VALUE_CLASS_NAME));
-
- // Flag for if compression is used
- out_.WriteBoolean(compress_flag_);
- // Only valid if compression is used. Indicates if block compression is used.
- out_.WriteBoolean(compress_flag_ && !record_compression_);
-
- // Output the name of our compression codec, parsed by readers
- if (compress_flag_) {
- out_.WriteText(codec_name_.size(),
- reinterpret_cast<const uint8_t*>(codec_name_.data()));
- }
-
- // Meta data is formated as an integer N followed by N*2 strings,
- // which are key-value pairs. Hive does not write meta data, so neither does Impala
- out_.WriteInt(0);
-
- // write the sync marker
- out_.WriteBytes(sync_marker_.size(), sync_marker_.data());
-
- string text = out_.String();
- RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(text.c_str()), text.size()));
- out_.Clear();
- return Status::OK();
-}
-
-Status HdfsSequenceTableWriter::WriteCompressedBlock() {
- WriteStream record;
- uint8_t *output;
- int64_t output_length;
- DCHECK(compress_flag_);
-
- // Add a sync marker to start of the block
- record.WriteBytes(neg1_sync_marker_.size(), neg1_sync_marker_.data());
-
- // Output the number of rows in this block
- record.WriteVLong(unflushed_rows_);
-
- // Output compressed key-lengths block-size & compressed key-lengths block.
- // The key-lengths block contains byte value of 4 as a key length for each row (this is
- // what Hive does).
- string key_lengths_text(unflushed_rows_, '\x04');
- {
- SCOPED_TIMER(parent_->compress_timer());
- RETURN_IF_ERROR(compressor_->ProcessBlock(false, key_lengths_text.size(),
- reinterpret_cast<const uint8_t*>(key_lengths_text.data()), &output_length,
- &output));
- }
- record.WriteVInt(output_length);
- record.WriteBytes(output_length, output);
-
- // Output compressed keys block-size & compressed keys block.
- // The keys block contains "\0\0\0\0" byte sequence as a key for each row (this is what
- // Hive does).
- string keys_text(unflushed_rows_ * 4, '\0');
- {
- SCOPED_TIMER(parent_->compress_timer());
- RETURN_IF_ERROR(compressor_->ProcessBlock(false, keys_text.size(),
- reinterpret_cast<const uint8_t*>(keys_text.data()), &output_length, &output));
- }
- record.WriteVInt(output_length);
- record.WriteBytes(output_length, output);
-
- // Output compressed value-lengths block-size & compressed value-lengths block
- string value_lengths_text = out_value_lengths_block_.String();
- {
- SCOPED_TIMER(parent_->compress_timer());
- RETURN_IF_ERROR(compressor_->ProcessBlock(false, value_lengths_text.size(),
- reinterpret_cast<const uint8_t*>(value_lengths_text.data()), &output_length, &output));
- }
- record.WriteVInt(output_length);
- record.WriteBytes(output_length, output);
-
- // Output compressed values block-size & compressed values block
- string text = out_.String();
- {
- SCOPED_TIMER(parent_->compress_timer());
- RETURN_IF_ERROR(compressor_->ProcessBlock(false, text.size(),
- reinterpret_cast<const uint8_t*>(text.data()), &output_length, &output));
- }
- record.WriteVInt(output_length);
- record.WriteBytes(output_length, output);
-
- string rec = record.String();
- RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(rec.data()), rec.size()));
- return Status::OK();
-}
-
-inline void HdfsSequenceTableWriter::WriteEscapedString(const StringValue* str_val,
- WriteStream* buf) {
- for (int i = 0; i < str_val->len; ++i) {
- if (str_val->ptr[i] == field_delim_ || str_val->ptr[i] == escape_char_) {
- buf->WriteByte(escape_char_);
- }
- buf->WriteByte(str_val->ptr[i]);
- }
-}
-
-void HdfsSequenceTableWriter::EncodeRow(TupleRow* row, WriteStream* buf) {
- // TODO Unify with text table writer
- int num_non_partition_cols =
- table_desc_->num_cols() - table_desc_->num_clustering_cols();
- DCHECK_GE(output_expr_evals_.size(), num_non_partition_cols)
- << parent_->DebugString();
- for (int j = 0; j < num_non_partition_cols; ++j) {
- void* value = output_expr_evals_[j]->GetValue(row);
- if (value != NULL) {
- if (output_expr_evals_[j]->root().type().type == TYPE_STRING) {
- WriteEscapedString(reinterpret_cast<const StringValue*>(value), &row_buf_);
- } else {
- string str;
- output_expr_evals_[j]->PrintValue(value, &str);
- buf->WriteBytes(str.size(), str.data());
- }
- } else {
- // NULLs in hive are encoded based on the 'serialization.null.format' property.
- const string& null_val = table_desc_->null_column_value();
- buf->WriteBytes(null_val.size(), null_val.data());
- }
- // Append field delimiter.
- if (j + 1 < num_non_partition_cols) {
- buf->WriteByte(field_delim_);
- }
- }
-}
-
-inline Status HdfsSequenceTableWriter::ConsumeRow(TupleRow* row) {
- ++unflushed_rows_;
- row_buf_.Clear();
- if (compress_flag_ && !record_compression_) {
- // Output row for a block compressed sequence file.
- // Value block: Write the length as a vlong and then write the contents.
- EncodeRow(row, &row_buf_);
- out_.WriteVLong(row_buf_.Size());
- out_.WriteBytes(row_buf_.Size(), row_buf_.String().data());
- // Value-lengths block: Write the number of bytes we have just written to out_ as
- // vlong
- out_value_lengths_block_.WriteVLong(
- ReadWriteUtil::VLongRequiredBytes(row_buf_.Size()) + row_buf_.Size());
- return Status::OK();
- }
-
- EncodeRow(row, &row_buf_);
-
- const uint8_t* value_bytes;
- int64_t value_length;
- string text = row_buf_.String();
- if (compress_flag_) {
- // apply compression to row_buf_
- // the length of the buffer must be prefixed to the buffer prior to compression
- //
- // TODO this incurs copy overhead to place the length in front of the
- // buffer prior to compression. We may want to rewrite to avoid copying.
- row_buf_.Clear();
- // encoding as "Text" writes the length before the text
- row_buf_.WriteText(text.size(), reinterpret_cast<const uint8_t*>(&text.data()[0]));
- text = row_buf_.String();
- uint8_t *tmp;
- {
- SCOPED_TIMER(parent_->compress_timer());
- RETURN_IF_ERROR(compressor_->ProcessBlock(false, text.size(),
- reinterpret_cast<const uint8_t*>(text.data()), &value_length, &tmp));
- }
- value_bytes = tmp;
- } else {
- value_length = text.size();
- DCHECK_EQ(value_length, row_buf_.Size());
- value_bytes = reinterpret_cast<const uint8_t*>(text.data());
- }
-
- int rec_len = value_length;
- // if the record is compressed, the length is part of the compressed text
- // if not, then we need to write the length (below) and account for it's size
- if (!compress_flag_) {
- rec_len += ReadWriteUtil::VLongRequiredBytes(value_length);
- }
- // The record contains the key, account for it's size (we use "\0\0\0\0" byte sequence
- // as a key just like Hive).
- rec_len += 4;
-
- // Length of the record (incl. key and value length)
- out_.WriteInt(rec_len);
-
- // Write length of the key and the key
- out_.WriteInt(4);
- out_.WriteBytes(4, "\0\0\0\0");
-
- // if the record is compressed, the length is part of the compressed text
- if (!compress_flag_) out_.WriteVLong(value_length);
-
- // write out the value (possibly compressed)
- out_.WriteBytes(value_length, value_bytes);
- return Status::OK();
-}
-
-Status HdfsSequenceTableWriter::Flush() {
- if (unflushed_rows_ == 0) return Status::OK();
-
- SCOPED_TIMER(parent_->hdfs_write_timer());
-
- if (compress_flag_ && !record_compression_) {
- RETURN_IF_ERROR(WriteCompressedBlock());
- } else {
- string out_str = out_.String();
- RETURN_IF_ERROR(
- Write(reinterpret_cast<const uint8_t*>(out_str.data()), out_str.size()));
- }
- out_.Clear();
- out_value_lengths_block_.Clear();
- unflushed_rows_ = 0;
- return Status::OK();
-}
-
-void HdfsSequenceTableWriter::Close() {
- // TODO: double check there is no memory leak.
- parent_->mem_tracker()->Release(approx_block_size_);
- mem_pool_->FreeAll();
-}
-
-} // namespace impala
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/hdfs-sequence-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-table-writer.h b/be/src/exec/hdfs-sequence-table-writer.h
deleted file mode 100644
index f315920..0000000
--- a/be/src/exec/hdfs-sequence-table-writer.h
+++ /dev/null
@@ -1,194 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_EXEC_HDFS_SEQUENCE_WRITER_H
-#define IMPALA_EXEC_HDFS_SEQUENCE_WRITER_H
-
-#include <hdfs.h>
-#include <sstream>
-
-#include "runtime/descriptors.h"
-#include "exec/hdfs-table-sink.h"
-#include "exec/hdfs-table-writer.h"
-#include "util/codec.h"
-#include "write-stream.h"
-
-namespace impala {
-
-class Expr;
-class TupleDescriptor;
-class TupleRow;
-class RuntimeState;
-struct StringValue;
-struct OutputPartition;
-
-/// Sequence files are flat files consisting of binary key/value pairs. Essentially there
-/// are 3 different formats for sequence files depending on the 'compression_codec' and
-/// 'seq_compression_mode' query options:
-/// - Uncompressed sequence file format
-/// - Record-compressed sequence file format
-/// - Block-compressed sequence file format
-/// All of them share a common header described below.
-///
-/// Sequence File Header
-/// --------------------
-/// - version - 3 bytes of magic header SEQ, followed by 1 byte of actual version number
-/// (e.g. SEQ4 or SEQ6)
-/// - keyClassName - key class
-/// - valueClassName - value class
-/// - compression - A boolean which specifies if compression is turned on for keys/values
-/// in this file.
-/// - blockCompression - A boolean which specifies if block-compression is turned on for
-/// keys/values in this file.
-/// - compression codec - compression codec class which is used for compression of keys
-/// and/or values (if compression is enabled).
-/// - metadata - SequenceFile.Metadata for this file.
-/// - sync - A 16 byte sync marker to denote end of the header.
-///
-/// Uncompressed Sequence File Format
-/// ---------------------------------
-/// - Header
-/// - Record
-/// - Record length
-/// - Key length
-/// - Key
-/// - Value
-/// - "\xFF\xFF\xFF\xFF" followed by a sync-marker every few 100 bytes or so.
-///
-/// Record-Compressed Sequence File Format
-/// --------------------------------------
-/// - Header
-/// - Record
-/// - Record length
-/// - Key length
-/// - Key
-/// - Compressed Value
-/// - "\xFF\xFF\xFF\xFF" followed by a sync-marker every few 100 bytes or so.
-///
-/// Block-Compressed Sequence File Format
-/// -------------------------------------
-/// - Header
-/// - Record Block
-/// - Uncompressed number of records in the block
-/// - Compressed key-lengths block-size
-/// - Compressed key-lengths block
-/// - Compressed keys block-size
-/// - Compressed keys block
-/// - Compressed value-lengths block-size
-/// - Compressed value-lengths block
-/// - Compressed values block-size
-/// - Compressed values block
-/// - "\xFF\xFF\xFF\xFF" followed by a sync-marker every block.
-/// The compressed blocks of key lengths and value lengths consist of the actual lengths
-/// of individual keys/values encoded in zero-compressed integer format.
-
-/// Consumes rows and outputs the rows into a sequence file in HDFS
-/// Output is buffered to fill sequence file blocks.
-class HdfsSequenceTableWriter : public HdfsTableWriter {
- public:
- HdfsSequenceTableWriter(HdfsTableSink* parent, RuntimeState* state,
- OutputPartition* output, const HdfsPartitionDescriptor* partition,
- const HdfsTableDescriptor* table_desc);
-
- ~HdfsSequenceTableWriter() { }
-
- virtual Status Init();
- virtual Status Finalize() { return Flush(); }
- virtual Status InitNewFile() { return WriteFileHeader(); }
- virtual void Close();
- virtual uint64_t default_block_size() const { return 0; }
- virtual std::string file_extension() const { return "seq"; }
-
- /// Outputs the given rows into an HDFS sequence file. The rows are buffered
- /// to fill a sequence file block.
- virtual Status AppendRows(
- RowBatch* rows, const std::vector<int32_t>& row_group_indices, bool* new_file);
-
- private:
- /// processes a single row, delegates to Compress or NoCompress ConsumeRow().
- inline Status ConsumeRow(TupleRow* row);
-
- /// writes the SEQ file header to HDFS
- Status WriteFileHeader();
-
- /// writes the contents of out_value_lengths_block_ and out_ as a single
- /// block-compressed record.
- Status WriteCompressedBlock();
-
- /// writes the tuple row to the given buffer; separates fields by field_delim_,
- /// escapes string.
- inline void EncodeRow(TupleRow* row, WriteStream* buf);
-
- /// writes the str_val to the buffer, escaping special characters
- inline void WriteEscapedString(const StringValue* str_val, WriteStream* buf);
-
- /// flushes the output -- clearing out_ and writing to HDFS
- /// if compress_flag_, will write contents of out_ as a single compressed block
- Status Flush();
-
- /// desired size of each block (bytes); actual block size will vary +/- the
- /// size of a row; this is before compression is applied.
- uint64_t approx_block_size_;
-
- /// buffer which holds accumulated output
- WriteStream out_;
-
- /// buffer which holds accumulated value-lengths output (used with block-compressed
- /// sequence files)
- WriteStream out_value_lengths_block_;
-
- /// Temporary Buffer for a single row
- WriteStream row_buf_;
-
- /// memory pool used by codec to allocate output buffer
- boost::scoped_ptr<MemPool> mem_pool_;
-
- /// true if compression is enabled
- bool compress_flag_;
-
- /// number of rows consumed since last flush
- uint64_t unflushed_rows_;
-
- /// name of codec, only set if compress_flag_
- std::string codec_name_;
- /// the codec for compressing, only set if compress_flag_
- boost::scoped_ptr<Codec> compressor_;
-
- /// true if compression is applied on each record individually
- bool record_compression_;
-
- /// Character delimiting fields
- char field_delim_;
-
- /// Escape character for text encoding
- char escape_char_;
-
- /// 16 byte sync marker (a uuid)
- std::string sync_marker_;
- /// A -1 infront of the sync marker, used in decompressed formats
- std::string neg1_sync_marker_;
-
- /// Name of java class to use when reading the keys
- static const char* KEY_CLASS_NAME;
- /// Name of java class to use when reading the values
- static const char* VALUE_CLASS_NAME;
- /// Magic characters used to identify the file type
- static const uint8_t SEQ6_CODE[4];
-};
-
-} // namespace impala
-#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index b6de7cf..9c46638 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -18,8 +18,6 @@
#include "exec/hdfs-table-sink.h"
#include "exec/hdfs-table-writer.h"
#include "exec/hdfs-text-table-writer.h"
-#include "exec/hdfs-sequence-table-writer.h"
-#include "exec/hdfs-avro-table-writer.h"
#include "exec/hdfs-parquet-table-writer.h"
#include "exec/exec-node.h"
#include "gen-cpp/ImpalaInternalService_constants.h"
@@ -469,28 +467,20 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* state,
output_partition->partition_descriptor = &partition_descriptor;
- bool allow_unsupported_formats =
- state->query_options().__isset.allow_unsupported_formats &&
- state->query_options().allow_unsupported_formats;
- if (!allow_unsupported_formats) {
- if (partition_descriptor.file_format() == THdfsFileFormat::SEQUENCE_FILE ||
- partition_descriptor.file_format() == THdfsFileFormat::AVRO) {
- stringstream error_msg;
- map<int, const char*>::const_iterator i =
- _THdfsFileFormat_VALUES_TO_NAMES.find(partition_descriptor.file_format());
- error_msg << "Writing to table format " << i->second
- << " is not supported. Use query option ALLOW_UNSUPPORTED_FORMATS"
- " to override.";
- return Status(error_msg.str());
- }
- if (partition_descriptor.file_format() == THdfsFileFormat::TEXT &&
- state->query_options().__isset.compression_codec &&
- state->query_options().compression_codec != THdfsCompression::NONE) {
- stringstream error_msg;
- error_msg << "Writing to compressed text table is not supported. "
- "Use query option ALLOW_UNSUPPORTED_FORMATS to override.";
- return Status(error_msg.str());
- }
+ if (partition_descriptor.file_format() == THdfsFileFormat::SEQUENCE_FILE ||
+ partition_descriptor.file_format() == THdfsFileFormat::AVRO) {
+ stringstream error_msg;
+ map<int, const char*>::const_iterator i =
+ _THdfsFileFormat_VALUES_TO_NAMES.find(partition_descriptor.file_format());
+ error_msg << "Writing to table format " << i->second << " is not supported.";
+ return Status(error_msg.str());
+ }
+ if (partition_descriptor.file_format() == THdfsFileFormat::TEXT &&
+ state->query_options().__isset.compression_codec &&
+ state->query_options().compression_codec != THdfsCompression::NONE) {
+ stringstream error_msg;
+ error_msg << "Writing to compressed text table is not supported. ";
+ return Status(error_msg.str());
}
// It is incorrect to initialize a writer if there are no rows to feed it. The writer
@@ -508,16 +498,6 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* state,
new HdfsParquetTableWriter(
this, state, output_partition, &partition_descriptor, table_desc_));
break;
- case THdfsFileFormat::SEQUENCE_FILE:
- output_partition->writer.reset(
- new HdfsSequenceTableWriter(
- this, state, output_partition, &partition_descriptor, table_desc_));
- break;
- case THdfsFileFormat::AVRO:
- output_partition->writer.reset(
- new HdfsAvroTableWriter(
- this, state, output_partition, &partition_descriptor, table_desc_));
- break;
default:
stringstream error_msg;
map<int, const char*>::const_iterator i =
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/hdfs-text-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-table-writer.cc b/be/src/exec/hdfs-text-table-writer.cc
index aaee773..f09b161 100644
--- a/be/src/exec/hdfs-text-table-writer.cc
+++ b/be/src/exec/hdfs-text-table-writer.cc
@@ -25,8 +25,6 @@
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/string-value.inline.h"
-#include "util/codec.h"
-#include "util/compress.h"
#include "util/hdfs-util.h"
#include "util/runtime-profile-counters.h"
@@ -35,13 +33,6 @@
#include "common/names.h"
-// Hdfs block size for compressed text.
-static const int64_t COMPRESSED_BLOCK_SIZE = 64 * 1024 * 1024;
-
-// Size to buffer before compression. We want this to be less than the block size
-// (compressed text is not splittable).
-static const int64_t COMPRESSED_BUFFERED_SIZE = 60 * 1024 * 1024;
-
namespace impala {
HdfsTextTableWriter::HdfsTextTableWriter(HdfsTableSink* parent,
@@ -61,41 +52,17 @@ HdfsTextTableWriter::HdfsTextTableWriter(HdfsTableSink* parent,
}
Status HdfsTextTableWriter::Init() {
- const TQueryOptions& query_options = state_->query_options();
- codec_ = THdfsCompression::NONE;
- if (query_options.__isset.compression_codec) {
- codec_ = query_options.compression_codec;
- if (codec_ == THdfsCompression::SNAPPY) {
- // hadoop.io.codec always means SNAPPY_BLOCKED. Alias the two.
- codec_ = THdfsCompression::SNAPPY_BLOCKED;
- }
- }
-
- if (codec_ != THdfsCompression::NONE) {
- mem_pool_.reset(new MemPool(parent_->mem_tracker()));
- RETURN_IF_ERROR(Codec::CreateCompressor(
- mem_pool_.get(), true, codec_, &compressor_));
- flush_size_ = COMPRESSED_BUFFERED_SIZE;
- } else {
- flush_size_ = HDFS_FLUSH_WRITE_SIZE;
- }
parent_->mem_tracker()->Consume(flush_size_);
return Status::OK();
}
void HdfsTextTableWriter::Close() {
parent_->mem_tracker()->Release(flush_size_);
- if (mem_pool_.get() != NULL) mem_pool_->FreeAll();
}
-uint64_t HdfsTextTableWriter::default_block_size() const {
- return compressor_.get() == NULL ? 0 : COMPRESSED_BLOCK_SIZE;
-}
+uint64_t HdfsTextTableWriter::default_block_size() const { return 0; }
-string HdfsTextTableWriter::file_extension() const {
- if (compressor_.get() == NULL) return "";
- return compressor_->file_extension();
-}
+string HdfsTextTableWriter::file_extension() const { return ""; }
Status HdfsTextTableWriter::AppendRows(
RowBatch* batch, const vector<int32_t>& row_group_indices, bool* new_file) {
@@ -152,12 +119,7 @@ Status HdfsTextTableWriter::AppendRows(
}
*new_file = false;
- if (rowbatch_stringstream_.tellp() >= flush_size_) {
- RETURN_IF_ERROR(Flush());
-
- // If compressed, start a new file (compressed data is not splittable).
- *new_file = compressor_.get() != NULL;
- }
+ if (rowbatch_stringstream_.tellp() >= flush_size_) RETURN_IF_ERROR(Flush());
return Status::OK();
}
@@ -178,22 +140,9 @@ Status HdfsTextTableWriter::InitNewFile() {
Status HdfsTextTableWriter::Flush() {
string rowbatch_string = rowbatch_stringstream_.str();
rowbatch_stringstream_.str(string());
- const uint8_t* uncompressed_data =
+ const uint8_t* data =
reinterpret_cast<const uint8_t*>(rowbatch_string.data());
- int64_t uncompressed_len = rowbatch_string.size();
- const uint8_t* data = uncompressed_data;
- int64_t len = uncompressed_len;
-
- if (compressor_.get() != NULL) {
- SCOPED_TIMER(parent_->compress_timer());
- uint8_t* compressed_data;
- int64_t compressed_len;
- RETURN_IF_ERROR(compressor_->ProcessBlock(false,
- uncompressed_len, uncompressed_data,
- &compressed_len, &compressed_data));
- data = compressed_data;
- len = compressed_len;
- }
+ int64_t len = rowbatch_string.size();
{
SCOPED_TIMER(parent_->hdfs_write_timer());
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/hdfs-text-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-table-writer.h b/be/src/exec/hdfs-text-table-writer.h
index 589ed23..e2f6135 100644
--- a/be/src/exec/hdfs-text-table-writer.h
+++ b/be/src/exec/hdfs-text-table-writer.h
@@ -87,15 +87,6 @@ class HdfsTextTableWriter : public HdfsTableWriter {
/// Stringstream to buffer output. The stream is cleared between HDFS
/// Write calls to allow for the internal buffers to be reused.
std::stringstream rowbatch_stringstream_;
-
- /// Compression codec.
- THdfsCompression::type codec_;
-
- /// Compressor if compression is enabled.
- boost::scoped_ptr<Codec> compressor_;
-
- /// Memory pool to use with compressor_.
- boost::scoped_ptr<MemPool> mem_pool_;
};
}
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/service/query-options-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index e5bc48d..b9bda60 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -208,8 +208,6 @@ TEST(QueryOptions, SetEnumOptions) {
TParquetFallbackSchemaResolution, (POSITION, NAME)), true);
TestEnumCase(options, CASE(parquet_array_resolution, TParquetArrayResolution,
(THREE_LEVEL, TWO_LEVEL, TWO_LEVEL_THEN_THREE_LEVEL)), true);
- TestEnumCase(options, CASE(seq_compression_mode, THdfsSeqCompressionMode,
- (BLOCK, RECORD)), false);
TestEnumCase(options, CASE(compression_codec, THdfsCompression,
(NONE, GZIP, BZIP2, DEFAULT, SNAPPY, SNAPPY_BLOCKED)), false);
#undef CASE
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 2e3415f..1063fef 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -226,25 +226,9 @@ Status impala::SetQueryOption(const string& key, const string& value,
case TImpalaQueryOptions::NUM_SCANNER_THREADS:
query_options->__set_num_scanner_threads(atoi(value.c_str()));
break;
- case TImpalaQueryOptions::ALLOW_UNSUPPORTED_FORMATS:
- query_options->__set_allow_unsupported_formats(
- iequals(value, "true") || iequals(value, "1"));
- break;
case TImpalaQueryOptions::DEBUG_ACTION:
query_options->__set_debug_action(value.c_str());
break;
- case TImpalaQueryOptions::SEQ_COMPRESSION_MODE: {
- if (iequals(value, "block")) {
- query_options->__set_seq_compression_mode(THdfsSeqCompressionMode::BLOCK);
- } else if (iequals(value, "record")) {
- query_options->__set_seq_compression_mode(THdfsSeqCompressionMode::RECORD);
- } else {
- stringstream ss;
- ss << "Invalid sequence file compression mode: " << value;
- return Status(ss.str());
- }
- break;
- }
case TImpalaQueryOptions::COMPRESSION_CODEC: {
if (iequals(value, "none")) {
query_options->__set_compression_codec(THdfsCompression::NONE);
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index fce042c..01f6e74 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -44,8 +44,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
TImpalaQueryOptions::ALLOW_ERASURE_CODED_FILES + 1);\
REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS,\
- TQueryOptionLevel::DEPRECATED)\
+ REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
QUERY_OPT_FN(batch_size, BATCH_SIZE, TQueryOptionLevel::DEVELOPMENT)\
QUERY_OPT_FN(debug_action, DEBUG_ACTION, TQueryOptionLevel::DEVELOPMENT)\
REMOVED_QUERY_OPT_FN(default_order_by_limit, DEFAULT_ORDER_BY_LIMIT)\
@@ -74,7 +73,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
QUERY_OPT_FN(buffer_pool_limit, BUFFER_POOL_LIMIT, TQueryOptionLevel::ADVANCED)\
QUERY_OPT_FN(appx_count_distinct, APPX_COUNT_DISTINCT, TQueryOptionLevel::ADVANCED)\
QUERY_OPT_FN(disable_unsafe_spills, DISABLE_UNSAFE_SPILLS, TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(seq_compression_mode, SEQ_COMPRESSION_MODE, TQueryOptionLevel::REGULAR)\
+ REMOVED_QUERY_OPT_FN(seq_compression_mode, SEQ_COMPRESSION_MODE)\
QUERY_OPT_FN(exec_single_node_rows_threshold, EXEC_SINGLE_NODE_ROWS_THRESHOLD,\
TQueryOptionLevel::ADVANCED)\
QUERY_OPT_FN(optimize_partition_key_scans, OPTIMIZE_PARTITION_KEY_SCANS,\
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 6780138..120aebc 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -101,7 +101,6 @@ struct TQueryOptions {
5: optional i32 num_nodes = NUM_NODES_ALL
6: optional i64 max_scan_range_length = 0
7: optional i32 num_scanner_threads = 0
- 9: optional bool allow_unsupported_formats = 0
11: optional string debug_action = ""
12: optional i64 mem_limit = 0
14: optional CatalogObjects.THdfsCompression compression_codec
@@ -133,11 +132,6 @@ struct TQueryOptions {
// has no plan hints, and at least one table is missing relevant stats.
29: optional bool disable_unsafe_spills = 0
- // Mode for compression; RECORD, or BLOCK
- // This field only applies for certain file types and is ignored
- // by all other file types.
- 30: optional CatalogObjects.THdfsSeqCompressionMode seq_compression_mode
-
// If the number of rows that are processed for a single query is below the
// threshold, it will be executed on the coordinator only with codegen disabled
31: optional i32 exec_single_node_rows_threshold = 100
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 529af04..665144f 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -72,8 +72,7 @@ enum TImpalaQueryOptions {
// Number of scanner threads.
NUM_SCANNER_THREADS,
- // If true, Impala will try to execute on file formats that are not fully supported yet
- ALLOW_UNSUPPORTED_FORMATS,
+ ALLOW_UNSUPPORTED_FORMATS, // Removed
DEFAULT_ORDER_BY_LIMIT, // Removed
@@ -110,8 +109,7 @@ enum TImpalaQueryOptions {
// Leave blank to use default.
COMPRESSION_CODEC,
- // Mode for compressing sequence files; either BLOCK, RECORD, or DEFAULT
- SEQ_COMPRESSION_MODE,
+ SEQ_COMPRESSION_MODE, // Removed
// HBase scan query option. If set and > 0, HBASE_CACHING is the value for
// "hbase.client.Scan.setCaching()" when querying HBase table. Otherwise, use backend
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 54ad57f..b671a1e 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -390,7 +390,6 @@ public class PlannerTestBase extends FrontendTestBase {
protected TQueryOptions defaultQueryOptions() {
TQueryOptions options = new TQueryOptions();
options.setExplain_level(TExplainLevel.STANDARD);
- options.setAllow_unsupported_formats(true);
options.setExec_single_node_rows_threshold(0);
return options;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/testdata/bad_avro_snap/README
----------------------------------------------------------------------
diff --git a/testdata/bad_avro_snap/README b/testdata/bad_avro_snap/README
index 6271967..71eb398 100644
--- a/testdata/bad_avro_snap/README
+++ b/testdata/bad_avro_snap/README
@@ -1,6 +1,6 @@
String Data
-----------
-Created by modifying Impala's HdfsAvroTableWriter.
+Created by modifying Impala's HdfsAvroTableWriter(removed).
These files' schemas have a single nullable string column 's'.
@@ -14,7 +14,7 @@ truncated_string.avro: contains one value, which is missing the last byte.
Float Data
----------
-Created by modifying Impala's HdfsAvroTableWriter.
+Created by modifying Impala's HdfsAvroTableWriter(removed).
These files' schemas have a single nullable float column 'c1'.
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/testdata/workloads/functional-query/queries/QueryTest/avro-writer.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/avro-writer.test b/testdata/workloads/functional-query/queries/QueryTest/avro-writer.test
deleted file mode 100644
index 6dc0899..0000000
--- a/testdata/workloads/functional-query/queries/QueryTest/avro-writer.test
+++ /dev/null
@@ -1,43 +0,0 @@
-====
----- QUERY
-drop table if exists __avro_write;
-====
----- QUERY
-SET COMPRESSION_CODEC=NONE;
-create table __avro_write (i int, s string, d double)
-stored as AVRO
-TBLPROPERTIES ('avro.schema.literal'='{
- "name": "my_record",
- "type": "record",
- "fields": [
- {"name":"i", "type":["int", "null"]},
- {"name":"s", "type":["string", "null"]},
- {"name":"d", "type":["double", "null"]}]}');
-====
----- QUERY
-SET COMPRESSION_CODEC=NONE;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __avro_write select 0, "a", 1.1;
-====
----- QUERY
-SET COMPRESSION_CODEC=SNAPPY;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __avro_write select 1, "b", 2.2;
-====
----- QUERY
-select * from __avro_write;
----- RESULTS
-0,'a',1.1
-1,'b',2.2
----- TYPES
-INT,STRING,DOUBLE
-====
----- QUERY
-SET ALLOW_UNSUPPORTED_FORMATS=0;
-insert into __avro_write select 1, "b", 2.2;
----- CATCH
-Writing to table format AVRO is not supported. Use query option ALLOW_UNSUPPORTED_FORMATS
-====
----- QUERY
-drop table __avro_write;
-====
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test b/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test
deleted file mode 100644
index 7e2363f..0000000
--- a/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test
+++ /dev/null
@@ -1,308 +0,0 @@
-====
----- QUERY
-SET COMPRESSION_CODEC=NONE;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-create table __seq_write (i int, s string, d double)
-stored as SEQUENCEFILE;
-====
----- QUERY
-SET COMPRESSION_CODEC=NONE;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __seq_write select 0, "a", 1.1;
-====
----- QUERY
-SET COMPRESSION_CODEC=DEFAULT;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __seq_write values (1, "b", 2.2);
-====
----- QUERY
-SET COMPRESSION_CODEC=SNAPPY;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __seq_write values (2, "c", 3.3);
-====
----- QUERY
-SET COMPRESSION_CODEC=SNAPPY_BLOCKED;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __seq_write values (3, "d", 4.4);
-====
----- QUERY
-SET COMPRESSION_CODEC=GZIP;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __seq_write values (4, "e", 5.5);
-====
----- QUERY
-SET COMPRESSION_CODEC=NONE;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __seq_write select 5, "a", 1.1;
-====
----- QUERY
-SET COMPRESSION_CODEC=DEFAULT;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __seq_write values (6, "b", 2.2);
-====
----- QUERY
-SET COMPRESSION_CODEC=SNAPPY;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __seq_write values (7, "c", 3.3);
-====
----- QUERY
-SET COMPRESSION_CODEC=SNAPPY_BLOCKED;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __seq_write values (8, "d", 4.4);
-====
----- QUERY
-SET COMPRESSION_CODEC=GZIP;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __seq_write values (9, "e", 5.5);
-====
----- QUERY
-SET ALLOW_UNSUPPORTED_FORMATS=0;
-insert into __seq_write values (4, "e", 5.5);
----- CATCH
-Writing to table format SEQUENCE_FILE is not supported. Use query option
-====
----- QUERY
-select * from __seq_write;
----- RESULTS
-0,'a',1.1
-1,'b',2.2
-2,'c',3.3
-3,'d',4.4
-4,'e',5.5
-5,'a',1.1
-6,'b',2.2
-7,'c',3.3
-8,'d',4.4
-9,'e',5.5
----- TYPES
-INT,STRING,DOUBLE
-====
----- QUERY
-# IMPALA-3079: Create a table containing larger seq files with NONE+RECORD and then read
-# it back
-SET COMPRESSION_CODEC=NONE;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table store_sales_seq_none_rec like tpcds_parquet.store_sales
-stored as SEQUENCEFILE;
-insert into store_sales_seq_none_rec partition(ss_sold_date_sk)
-select * from tpcds_parquet.store_sales
-where (ss_sold_date_sk between 2451175 and 2451200) or
- (ss_sold_date_sk is null and ss_sold_time_sk > 60000);
-====
----- QUERY
-select count(*) from store_sales_seq_none_rec;
----- RESULTS
-60091
----- TYPES
-BIGINT
-====
----- QUERY
-# IMPALA-3079: Create a table containing larger seq files with DEFAULT+RECORD and then
-# read it back
-SET COMPRESSION_CODEC=DEFAULT;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table store_sales_seq_def_rec like tpcds_parquet.store_sales
-stored as SEQUENCEFILE;
-insert into store_sales_seq_def_rec partition(ss_sold_date_sk)
-select * from tpcds_parquet.store_sales
-where (ss_sold_date_sk between 2451175 and 2451200) or
- (ss_sold_date_sk is null and ss_sold_time_sk > 60000);
-====
----- QUERY
-select count(*) from store_sales_seq_def_rec;
----- RESULTS
-60091
----- TYPES
-BIGINT
-====
----- QUERY
-# IMPALA-3079: Create a table containing larger seq files with SNAPPY_BLOCKED+RECORD and
-# then read it back
-SET COMPRESSION_CODEC=SNAPPY_BLOCKED;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table store_sales_seq_snapb_rec like tpcds_parquet.store_sales
-stored as SEQUENCEFILE;
-insert into store_sales_seq_snapb_rec partition(ss_sold_date_sk)
-select * from tpcds_parquet.store_sales
-where (ss_sold_date_sk between 2451175 and 2451200) or
- (ss_sold_date_sk is null and ss_sold_time_sk > 60000);
-====
----- QUERY
-select count(*) from store_sales_seq_snapb_rec;
----- RESULTS
-60091
----- TYPES
-BIGINT
-====
----- QUERY
-# IMPALA-3079: Create a table containing larger seq files with SNAPPY+RECORD and then read
-# it back
-SET COMPRESSION_CODEC=SNAPPY;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table store_sales_seq_snap_rec like tpcds_parquet.store_sales
-stored as SEQUENCEFILE;
-insert into store_sales_seq_snap_rec partition(ss_sold_date_sk)
-select * from tpcds_parquet.store_sales
-where (ss_sold_date_sk between 2451175 and 2451200) or
- (ss_sold_date_sk is null and ss_sold_time_sk > 60000);
-====
----- QUERY
-select count(*) from store_sales_seq_snap_rec;
----- RESULTS
-60091
----- TYPES
-BIGINT
-====
----- QUERY
-# IMPALA-3079: Create a table containing larger seq files with GZIP+RECORD and then read
-# it back
-SET COMPRESSION_CODEC=GZIP;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table store_sales_seq_gzip_rec like tpcds_parquet.store_sales
-stored as SEQUENCEFILE;
-insert into store_sales_seq_gzip_rec partition(ss_sold_date_sk)
-select * from tpcds_parquet.store_sales
-where (ss_sold_date_sk between 2451175 and 2451200) or
- (ss_sold_date_sk is null and ss_sold_time_sk > 60000);
-====
----- QUERY
-select count(*) from store_sales_seq_gzip_rec;
----- RESULTS
-60091
----- TYPES
-BIGINT
-====
----- QUERY
-# IMPALA-3079: Create a table containing larger seq files with NONE+BLOCK and then read it
-# back
-SET COMPRESSION_CODEC=NONE;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table store_sales_seq_none_block like tpcds_parquet.store_sales
-stored as SEQUENCEFILE;
-insert into store_sales_seq_none_block partition(ss_sold_date_sk)
-select * from tpcds_parquet.store_sales
-where (ss_sold_date_sk between 2451175 and 2451200) or
- (ss_sold_date_sk is null and ss_sold_time_sk > 60000);
-====
----- QUERY
-select count(*) from store_sales_seq_none_block;
----- RESULTS
-60091
----- TYPES
-BIGINT
-====
----- QUERY
-# IMPALA-3079: Create a table containing larger seq files with DEFAULT+BLOCK and then read
-# it back
-SET COMPRESSION_CODEC=DEFAULT;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table store_sales_seq_def_block like tpcds_parquet.store_sales
-stored as SEQUENCEFILE;
-insert into store_sales_seq_def_block partition(ss_sold_date_sk)
-select * from tpcds_parquet.store_sales
-where (ss_sold_date_sk between 2451175 and 2451200) or
- (ss_sold_date_sk is null and ss_sold_time_sk > 60000);
-====
----- QUERY
-select count(*) from store_sales_seq_def_block;
----- RESULTS
-60091
----- TYPES
-BIGINT
-====
----- QUERY
-# IMPALA-3079: Create a table containing larger seq files with SNAPPY_BLOCKED+BLOCK and
-# then read it back
-SET COMPRESSION_CODEC=SNAPPY_BLOCKED;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table store_sales_seq_snapb_block like tpcds_parquet.store_sales
-stored as SEQUENCEFILE;
-insert into store_sales_seq_snapb_block partition(ss_sold_date_sk)
-select * from tpcds_parquet.store_sales
-where (ss_sold_date_sk between 2451175 and 2451200) or
- (ss_sold_date_sk is null and ss_sold_time_sk > 60000);
-====
----- QUERY
-select count(*) from store_sales_seq_snapb_block;
----- RESULTS
-60091
----- TYPES
-BIGINT
-====
----- QUERY
-# IMPALA-3079: Create a table containing larger seq files with SNAPPY+BLOCK and then read
-# it back
-SET COMPRESSION_CODEC=SNAPPY;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table store_sales_seq_snap_block like tpcds_parquet.store_sales
-stored as SEQUENCEFILE;
-insert into store_sales_seq_snap_block partition(ss_sold_date_sk)
-select * from tpcds_parquet.store_sales
-where (ss_sold_date_sk between 2451175 and 2451200) or
- (ss_sold_date_sk is null and ss_sold_time_sk > 60000);
-====
----- QUERY
-select count(*) from store_sales_seq_snap_block;
----- RESULTS
-60091
----- TYPES
-BIGINT
-====
----- QUERY
-# IMPALA-3079: Create a table containing larger seq files with GZIP+BLOCK and then read it
-# back
-SET COMPRESSION_CODEC=GZIP;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table store_sales_seq_gzip_block like tpcds_parquet.store_sales
-stored as SEQUENCEFILE;
-insert into store_sales_seq_gzip_block partition(ss_sold_date_sk)
-select * from tpcds_parquet.store_sales
-where (ss_sold_date_sk between 2451175 and 2451200) or
- (ss_sold_date_sk is null and ss_sold_time_sk > 60000);
-====
----- QUERY
-select count(*) from store_sales_seq_gzip_block;
----- RESULTS
-60091
----- TYPES
-BIGINT
-====
----- QUERY
-# IMPALA-5407: Create a table containing seq files with GZIP+RECORD. If the number of
-# impalad workers is three, three files will be created, two of which are large enough
-# (> 64MB) to force multiple flushes. Make sure that the files have been created
-# successfully.
-SET COMPRESSION_CODEC=GZIP;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table catalog_sales_seq_gzip_rec like tpcds.catalog_sales stored as SEQUENCEFILE;
-insert into catalog_sales_seq_gzip_rec select * from tpcds.catalog_sales;
-====
----- QUERY
-select count(*) from catalog_sales_seq_gzip_rec;
----- RESULTS
-1441548
----- TYPES
-BIGINT
-====
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/testdata/workloads/functional-query/queries/QueryTest/set.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/set.test b/testdata/workloads/functional-query/queries/QueryTest/set.test
index 5a2c56a..ffb53a1 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/set.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/set.test
@@ -8,7 +8,6 @@ set buffer_pool_limit=7;
set all;
---- RESULTS: VERIFY_IS_SUBSET
'ABORT_ON_ERROR','0','REGULAR'
-'ALLOW_UNSUPPORTED_FORMATS','0','DEPRECATED'
'BATCH_SIZE','0','DEVELOPMENT'
'BUFFER_POOL_LIMIT','','ADVANCED'
'DEBUG_ACTION','','DEVELOPMENT'
@@ -34,7 +33,6 @@ set explain_level=3;
set all;
---- RESULTS: VERIFY_IS_SUBSET
'ABORT_ON_ERROR','0','REGULAR'
-'ALLOW_UNSUPPORTED_FORMATS','0','DEPRECATED'
'BATCH_SIZE','0','DEVELOPMENT'
'BUFFER_POOL_LIMIT','','ADVANCED'
'DEBUG_ACTION','','DEVELOPMENT'
@@ -60,7 +58,6 @@ set explain_level='0';
set all;
---- RESULTS: VERIFY_IS_SUBSET
'ABORT_ON_ERROR','0','REGULAR'
-'ALLOW_UNSUPPORTED_FORMATS','0','DEPRECATED'
'BATCH_SIZE','0','DEVELOPMENT'
'BUFFER_POOL_LIMIT','','ADVANCED'
'DEBUG_ACTION','','DEVELOPMENT'
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/testdata/workloads/functional-query/queries/QueryTest/text-writer.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/text-writer.test b/testdata/workloads/functional-query/queries/QueryTest/text-writer.test
deleted file mode 100644
index 89cd730..0000000
--- a/testdata/workloads/functional-query/queries/QueryTest/text-writer.test
+++ /dev/null
@@ -1,47 +0,0 @@
-====
----- QUERY
-drop table if exists __text_write;
-====
----- QUERY
-create table __text_write (i int, s string, d double);
-====
----- QUERY
-SET COMPRESSION_CODEC=NONE;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __text_write select 0, "a", 1.1;
-====
----- QUERY
-SET COMPRESSION_CODEC=DEFAULT;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __text_write values (1, "b", 2.2);
-====
----- QUERY
-SET COMPRESSION_CODEC=SNAPPY;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __text_write values (2, "c", 3.3);
-====
----- QUERY
-SET COMPRESSION_CODEC=GZIP;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __text_write values (3, "d", 4.4);
-====
----- QUERY
-SET COMPRESSION_CODEC=GZIP;
-SET ALLOW_UNSUPPORTED_FORMATS=0;
-insert into __text_write values (3, "d", 4.4);
----- CATCH
-Writing to compressed text table is not supported.
-====
----- QUERY
-select * from __text_write;
----- RESULTS
-0,'a',1.1
-1,'b',2.2
-2,'c',3.3
-3,'d',4.4
----- TYPES
-INT,STRING,DOUBLE
-====
----- QUERY
-drop table __text_write;
-====
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/testdata/workloads/functional-query/queries/QueryTest/unsupported-writers.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/unsupported-writers.test b/testdata/workloads/functional-query/queries/QueryTest/unsupported-writers.test
new file mode 100644
index 0000000..68f355f
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/unsupported-writers.test
@@ -0,0 +1,77 @@
+====
+---- QUERY
+create table __text_write (i int, s string, d double);
+====
+---- QUERY
+SET COMPRESSION_CODEC=NONE;
+insert into __text_write select 0, "a", 1.1;
+====
+---- QUERY
+SET COMPRESSION_CODEC=GZIP;
+insert into __text_write values (3, "d", 4.4);
+---- CATCH
+Writing to compressed text table is not supported.
+====
+---- QUERY
+select * from __text_write;
+---- RESULTS
+0,'a',1.1
+---- TYPES
+INT,STRING,DOUBLE
+====
+---- QUERY
+create table __avro_write (i int, s string, d double)
+stored as AVRO
+TBLPROPERTIES ('avro.schema.literal'='{
+ "name": "my_record",
+ "type": "record",
+ "fields": [
+ {"name":"i", "type":["int", "null"]},
+ {"name":"s", "type":["string", "null"]},
+ {"name":"d", "type":["double", "null"]}]}');
+====
+---- QUERY
+insert into __avro_write select 1, "b", 2.2;
+---- CATCH
+Writing to table format AVRO is not supported.
+====
+---- QUERY
+create table __seq_write (i int, s string, d double)
+stored as SEQUENCEFILE;
+====
+---- QUERY
+insert into __seq_write values (4, "e", 5.5);
+---- CATCH
+Writing to table format SEQUENCE_FILE is not supported.
+====
+---- QUERY
+# Test writing to mixed format table containing partitions in both supported and
+# unsupported formats where writing to the partition with supported format should succeed.
+# Create a table containing both text(supported) and avro(unsupported) partitions.
+create table __mixed_format_write (id int) partitioned by (part int);
+====
+---- QUERY
+insert into __mixed_format_write partition(part=2000) values(1);
+====
+---- QUERY
+insert into __mixed_format_write partition(part=2001) values(2);
+====
+---- QUERY
+alter table __mixed_format_write partition (part=2001) set fileformat AVRO;
+====
+---- QUERY
+insert into __mixed_format_write partition(part=2000) values(3);
+====
+---- QUERY
+insert into __mixed_format_write partition(part=2001) values(4);
+---- CATCH
+Writing to table format AVRO is not supported.
+====
+---- QUERY
+select id, part from __mixed_format_write where part = 2000;
+---- RESULTS
+1,2000
+3,2000
+---- TYPES
+INT,INT
+====
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/tests/common/test_dimensions.py
----------------------------------------------------------------------
diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py
index 785cfa9..0460ea7 100644
--- a/tests/common/test_dimensions.py
+++ b/tests/common/test_dimensions.py
@@ -108,19 +108,6 @@ def create_parquet_dimension(workload):
return ImpalaTestDimension('table_format',
TableFormatInfo.create_from_string(dataset, 'parquet/none'))
-# Available Exec Options:
-#01: abort_on_error (bool)
-#02 max_errors (i32)
-#03: disable_codegen (bool)
-#04: batch_size (i32)
-#05: return_as_ascii (bool)
-#06: num_nodes (i32)
-#07: max_scan_range_length (i64)
-#08: num_scanner_threads (i32)
-#09: max_io_buffers (i32)
-#10: allow_unsupported_formats (bool)
-#11: partition_agg (bool)
-
# Common sets of values for the exec option vectors
ALL_BATCH_SIZES = [0]
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/tests/hs2/test_hs2.py
----------------------------------------------------------------------
diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py
index cd861e9..795f45c 100644
--- a/tests/hs2/test_hs2.py
+++ b/tests/hs2/test_hs2.py
@@ -89,11 +89,10 @@ class TestHS2(HS2TestSuite):
# Should be unchanged
assert vals2["SYNC_DDL"] == "0"
- # Verify that 'DEVELOPMENT' and 'DEPRECATED' options are not returned.
assert "MAX_ERRORS" in vals2
assert levels["MAX_ERRORS"] == "ADVANCED"
+ # Verify that 'DEVELOPMENT' options are not returned.
assert "DEBUG_ACTION" not in vals2
- assert "ALLOW_UNSUPPORTED_FORMATS" not in vals2
# Removed options should not be returned.
assert "MAX_IO_BUFFERS" not in vals2
@@ -101,7 +100,8 @@ class TestHS2(HS2TestSuite):
@needs_session()
def test_session_option_levels_via_set_all(self):
"""
- Tests the level of session options returned by a SET ALL query.
+ Tests the level of session options returned by a SET ALL query except DEPRECATED as we
+ currently do not have any of those left.
"""
vals, levels = self.get_session_options("SET ALL")
@@ -109,12 +109,10 @@ class TestHS2(HS2TestSuite):
assert "SYNC_DDL" in vals
assert "MAX_ERRORS" in vals
assert "DEBUG_ACTION" in vals
- assert "ALLOW_UNSUPPORTED_FORMATS" in vals
assert levels["COMPRESSION_CODEC"] == "REGULAR"
assert levels["SYNC_DDL"] == "REGULAR"
assert levels["MAX_ERRORS"] == "ADVANCED"
assert levels["DEBUG_ACTION"] == "DEVELOPMENT"
- assert levels["ALLOW_UNSUPPORTED_FORMATS"] == "DEPRECATED"
# Removed options should not be returned.
assert "MAX_IO_BUFFERS" not in vals
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/tests/metadata/test_partition_metadata.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_partition_metadata.py b/tests/metadata/test_partition_metadata.py
index 1d77aa5..d23e3f0 100644
--- a/tests/metadata/test_partition_metadata.py
+++ b/tests/metadata/test_partition_metadata.py
@@ -45,10 +45,7 @@ class TestPartitionMetadata(ImpalaTestSuite):
# compression codecs.
cls.ImpalaTestMatrix.add_constraint(lambda v:
(v.get_value('table_format').file_format in ('text', 'parquet') and
- v.get_value('table_format').compression_codec == 'none') or
- (v.get_value('table_format').file_format in ('seq', 'avro') and
- v.get_value('table_format').compression_codec == 'snap' and
- v.get_value('table_format').compression_type == 'block'))
+ v.get_value('table_format').compression_codec == 'none'))
@SkipIfLocal.hdfs_client # TODO: this dependency might not exist anymore
def test_multiple_partitions_same_location(self, vector, unique_database):
@@ -70,9 +67,6 @@ class TestPartitionMetadata(ImpalaTestSuite):
self.client.execute("alter table %s add partition (j=2) location '%s/p'"
% (FQ_TBL_NAME, TBL_LOCATION))
- # Allow unsupported avro and sequence file writer.
- self.client.execute("set allow_unsupported_formats=true")
-
# Insert some data. This will only update partition j=1 (IMPALA-1480).
self.client.execute("insert into table %s partition(j=1) select 1" % FQ_TBL_NAME)
# Refresh to update file metadata of both partitions
@@ -80,31 +74,19 @@ class TestPartitionMetadata(ImpalaTestSuite):
# The data will be read twice because each partition points to the same location.
data = self.execute_scalar("select sum(i), sum(j) from %s" % FQ_TBL_NAME)
- if file_format == 'avro':
- # Avro writer is broken and produces nulls. Only check partition column.
- assert data.split('\t')[1] == '3'
- else:
- assert data.split('\t') == ['2', '3']
+ assert data.split('\t') == ['2', '3']
self.client.execute("insert into %s partition(j) select 1, 1" % FQ_TBL_NAME)
self.client.execute("insert into %s partition(j) select 1, 2" % FQ_TBL_NAME)
self.client.execute("refresh %s" % FQ_TBL_NAME)
data = self.execute_scalar("select sum(i), sum(j) from %s" % FQ_TBL_NAME)
- if file_format == 'avro':
- # Avro writer is broken and produces nulls. Only check partition column.
- assert data.split('\t')[1] == '9'
- else:
- assert data.split('\t') == ['6', '9']
+ assert data.split('\t') == ['6', '9']
# Force all scan ranges to be on the same node. It should produce the same
# result as above. See IMPALA-5412.
self.client.execute("set num_nodes=1")
data = self.execute_scalar("select sum(i), sum(j) from %s" % FQ_TBL_NAME)
- if file_format == 'avro':
- # Avro writer is broken and produces nulls. Only check partition column.
- assert data.split('\t')[1] == '9'
- else:
- assert data.split('\t') == ['6', '9']
+ assert data.split('\t') == ['6', '9']
@SkipIfS3.hive
@SkipIfADLS.hive
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/tests/query_test/test_compressed_formats.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_compressed_formats.py b/tests/query_test/test_compressed_formats.py
index 694cfe9..2896632 100644
--- a/tests/query_test/test_compressed_formats.py
+++ b/tests/query_test/test_compressed_formats.py
@@ -129,71 +129,25 @@ class TestCompressedFormats(ImpalaTestSuite):
finally:
call(["hive", "-e", drop_cmd]);
-class TestTableWriters(ImpalaTestSuite):
+class TestUnsupportedTableWriters(ImpalaTestSuite):
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
- super(TestTableWriters, cls).add_test_dimensions()
+ super(TestUnsupportedTableWriters, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
- # This class tests many formats, but doesn't use the contraints
- # Each format is tested within one test file, we constrain to text/none
- # as each test file only needs to be run once.
+ # This class tests different formats, but doesn't use constraints.
+ # The constraint added below is only to make sure that the test file runs once.
cls.ImpalaTestMatrix.add_constraint(lambda v:
(v.get_value('table_format').file_format =='text' and
v.get_value('table_format').compression_codec == 'none'))
- def test_seq_writer(self, vector, unique_database):
- self.run_test_case('QueryTest/seq-writer', vector, unique_database)
-
- @SkipIfS3.hive
- @SkipIfADLS.hive
- @SkipIfIsilon.hive
- @SkipIfLocal.hive
- def test_seq_writer_hive_compatibility(self, vector, unique_database):
- self.client.execute('set ALLOW_UNSUPPORTED_FORMATS=1')
- # Write sequence files with different compression codec/compression mode and then read
- # it back in Impala and Hive.
- # Note that we don't test snappy here as the snappy codec used by Impala does not seem
- # to be fully compatible with the snappy codec used by Hive.
- for comp_codec, comp_mode in [('NONE', 'RECORD'), ('NONE', 'BLOCK'),
- ('DEFAULT', 'RECORD'), ('DEFAULT', 'BLOCK'),
- ('GZIP', 'RECORD'), ('GZIP', 'BLOCK')]:
- table_name = '%s.seq_tbl_%s_%s' % (unique_database, comp_codec, comp_mode)
- self.client.execute('set COMPRESSION_CODEC=%s' % comp_codec)
- self.client.execute('set SEQ_COMPRESSION_MODE=%s' % comp_mode)
- self.client.execute('create table %s like functional.zipcode_incomes stored as '
- 'sequencefile' % table_name)
- # Write sequence file of size greater than 4K
- self.client.execute('insert into %s select * from functional.zipcode_incomes where '
- 'zip >= "5"' % table_name)
- # Write sequence file of size less than 4K
- self.client.execute('insert into %s select * from functional.zipcode_incomes where '
- 'zip="00601"' % table_name)
-
- count_query = 'select count(*) from %s' % table_name
-
- # Read it back in Impala
- output = self.client.execute(count_query)
- assert '16541' == output.get_data()
- # Read it back in Hive
- # Note that username is passed in for the sake of remote cluster tests. The default
- # HDFS user is typically 'hdfs', and this is needed to run a count() operation using
- # hive. For local mini clusters, the usename can be anything. See IMPALA-5413.
- output = self.run_stmt_in_hive(count_query, username='hdfs')
- assert '16541' == output.split('\n')[1]
-
- def test_avro_writer(self, vector):
- self.run_test_case('QueryTest/avro-writer', vector)
-
- def test_text_writer(self, vector):
- # TODO debug this test.
- # This caused by a zlib failure. Suspected cause is too small a buffer
- # passed to zlib for compression; similar to IMPALA-424
- pytest.skip()
- self.run_test_case('QueryTest/text-writer', vector)
+ def test_error_message(self, vector, unique_database):
+ # Tests that an appropriate error message is displayed for unsupported writers like
+ # compressed text, avro and sequence.
+ self.run_test_case('QueryTest/unsupported-writers', vector, unique_database)
@pytest.mark.execute_serially
class TestLargeCompressedFile(ImpalaTestSuite):
http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/tests/shell/test_shell_interactive.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_interactive.py b/tests/shell/test_shell_interactive.py
index fe631cf..eac9d27 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -389,11 +389,10 @@ class TestImpalaShellInteractive(object):
assert "APPX_COUNT_DISTINCT" in result.stdout
assert "SUPPORT_START_OVER" in result.stdout
# Development, deprecated and removed options should not be shown.
+ # Note: there are currently no deprecated options
assert "Development Query Options:" not in result.stdout
- assert "DEBUG_ACTION" not in result.stdout
- assert "Deprecated Query Options:" not in result.stdout
- assert "ALLOW_UNSUPPORTED_FORMATS" not in result.stdout
- assert "MAX_IO_BUFFERS" not in result.stdout
+ assert "DEBUG_ACTION" not in result.stdout # Development option.
+ assert "MAX_IO_BUFFERS" not in result.stdout # Removed option.
shell2 = ImpalaShell()
shell2.send_cmd("set all")
@@ -401,7 +400,7 @@ class TestImpalaShellInteractive(object):
assert "Query options (defaults shown in []):" in result.stdout
assert "Advanced Query Options:" in result.stdout
assert "Development Query Options:" in result.stdout
- assert "Deprecated Query Options:" in result.stdout
+ assert "Deprecated Query Options:" not in result.stdout
advanced_part_start_idx = result.stdout.find("Advanced Query Options")
development_part_start_idx = result.stdout.find("Development Query Options")
deprecated_part_start_idx = result.stdout.find("Deprecated Query Options")
@@ -411,7 +410,6 @@ class TestImpalaShellInteractive(object):
assert "APPX_COUNT_DISTINCT" in advanced_part
assert "SUPPORT_START_OVER" in advanced_part
assert "DEBUG_ACTION" in development_part
- assert "ALLOW_UNSUPPORTED_FORMATS" in result.stdout[deprecated_part_start_idx:]
# Removed options should not be shown.
assert "MAX_IO_BUFFERS" not in result.stdout