You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/04 01:07:46 UTC

[4/6] impala git commit: IMPALA-7190: Remove unsupported format writer support

IMPALA-7190: Remove unsupported format writer support

This patch removes write support for unsupported formats like Sequence,
Avro and compressed text. Also, the related query options
ALLOW_UNSUPPORTED_FORMATS and SEQ_COMPRESSION_MODE have been migrated
to the REMOVED query options type.

Testing:
Ran exhaustive build.

Change-Id: I821dc7495a901f1658daa500daf3791b386c7185
Reviewed-on: http://gerrit.cloudera.org:8080/10823
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/30e82c63
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/30e82c63
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/30e82c63

Branch: refs/heads/master
Commit: 30e82c63ecdd56ded10fed931d95ab6d994b9244
Parents: 6f52ce1
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Mon Jun 25 18:11:08 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 3 20:34:27 2018 +0000

----------------------------------------------------------------------
 be/src/exec/CMakeLists.txt                      |   2 -
 be/src/exec/hdfs-avro-table-writer.cc           | 295 ---------------
 be/src/exec/hdfs-avro-table-writer.h            | 121 -------
 be/src/exec/hdfs-sequence-table-writer.cc       | 361 -------------------
 be/src/exec/hdfs-sequence-table-writer.h        | 194 ----------
 be/src/exec/hdfs-table-sink.cc                  |  48 +--
 be/src/exec/hdfs-text-table-writer.cc           |  61 +---
 be/src/exec/hdfs-text-table-writer.h            |   9 -
 be/src/service/query-options-test.cc            |   2 -
 be/src/service/query-options.cc                 |  16 -
 be/src/service/query-options.h                  |   5 +-
 common/thrift/ImpalaInternalService.thrift      |   6 -
 common/thrift/ImpalaService.thrift              |   6 +-
 .../apache/impala/planner/PlannerTestBase.java  |   1 -
 testdata/bad_avro_snap/README                   |   4 +-
 .../queries/QueryTest/avro-writer.test          |  43 ---
 .../queries/QueryTest/seq-writer.test           | 308 ----------------
 .../functional-query/queries/QueryTest/set.test |   3 -
 .../queries/QueryTest/text-writer.test          |  47 ---
 .../queries/QueryTest/unsupported-writers.test  |  77 ++++
 tests/common/test_dimensions.py                 |  13 -
 tests/hs2/test_hs2.py                           |   8 +-
 tests/metadata/test_partition_metadata.py       |  26 +-
 tests/query_test/test_compressed_formats.py     |  62 +---
 tests/shell/test_shell_interactive.py           |  10 +-
 25 files changed, 121 insertions(+), 1607 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 1753cb0..4544b95 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -56,12 +56,10 @@ add_library(Exec
   hdfs-rcfile-scanner.cc
   hdfs-sequence-scanner.cc
   hdfs-avro-scanner.cc
-  hdfs-avro-table-writer.cc
   hdfs-avro-scanner-ir.cc
   hdfs-plugin-text-scanner.cc
   hdfs-text-scanner.cc
   hdfs-text-table-writer.cc
-  hdfs-sequence-table-writer.cc
   hdfs-parquet-scanner.cc
   hdfs-parquet-scanner-ir.cc
   hdfs-parquet-table-writer.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/hdfs-avro-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-table-writer.cc b/be/src/exec/hdfs-avro-table-writer.cc
deleted file mode 100644
index 3ce296d..0000000
--- a/be/src/exec/hdfs-avro-table-writer.cc
+++ /dev/null
@@ -1,295 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/hdfs-avro-table-writer.h"
-
-#include <vector>
-#include <hdfs.h>
-#include <boost/scoped_ptr.hpp>
-#include <stdlib.h>
-#include <gutil/strings/substitute.h>
-
-#include "exec/exec-node.h"
-#include "exec/hdfs-table-sink.h"
-#include "util/compress.h"
-#include "util/hdfs-util.h"
-#include "util/uid-util.h"
-#include "exprs/scalar-expr.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "runtime/mem-pool.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/raw-value.h"
-#include "runtime/row-batch.h"
-#include "runtime/runtime-state.h"
-#include "runtime/hdfs-fs-cache.h"
-#include "runtime/types.h"
-#include "util/runtime-profile-counters.h"
-#include "write-stream.inline.h"
-
-#include "common/names.h"
-
-using namespace strings;
-using namespace impala;
-
-const uint8_t OBJ1[4] = {'O', 'b', 'j', 1};
-const char* AVRO_SCHEMA_STR = "avro.schema";
-const char* AVRO_CODEC_STR = "avro.codec";
-const THdfsCompression::type AVRO_DEFAULT_CODEC = THdfsCompression::SNAPPY;
-// Desired size of each Avro block (bytes); actual block size will vary +/- the
-// size of a row. This is approximate size of the block before compression.
-const int DEFAULT_AVRO_BLOCK_SIZE = 64 * 1024;
-
-HdfsAvroTableWriter::HdfsAvroTableWriter(HdfsTableSink* parent,
-    RuntimeState* state, OutputPartition* output,
-    const HdfsPartitionDescriptor* partition, const HdfsTableDescriptor* table_desc)
-  : HdfsTableWriter(parent, state, output, partition, table_desc),
-    unflushed_rows_(0) {
-  mem_pool_.reset(new MemPool(parent->mem_tracker()));
-}
-
-void HdfsAvroTableWriter::ConsumeRow(TupleRow* row) {
-  ++unflushed_rows_;
-  int num_non_partition_cols =
-      table_desc_->num_cols() - table_desc_->num_clustering_cols();
-  for (int j = 0; j < num_non_partition_cols; ++j) {
-    void* value = output_expr_evals_[j]->GetValue(row);
-    AppendField(output_expr_evals_[j]->root().type(), value);
-  }
-}
-
-inline void HdfsAvroTableWriter::AppendField(const ColumnType& type, const void* value) {
-  // Each avro field is written as union, which is a ZLong indicating the union
-  // field followed by the encoded value. Impala/Hive always stores values as
-  // a union of [ColumnType, NULL].
-  // TODO: the writer may be asked to write [NULL, ColumnType] unions. It is wrong
-  // for us to assume [ColumnType, NULL].
-
-  if (value == NULL) {
-    // indicate the second field of the union
-    out_.WriteZLong(1);
-    // No bytes are written for a null value.
-    return;
-  }
-
-  // indicate that we are using the first field of the union
-  out_.WriteZLong(0);
-
-  switch (type.type) {
-    case TYPE_BOOLEAN:
-      out_.WriteByte(*reinterpret_cast<const char*>(value));
-      break;
-    case TYPE_TINYINT:
-      out_.WriteZInt(*reinterpret_cast<const int8_t*>(value));
-      break;
-    case TYPE_SMALLINT:
-      out_.WriteZInt(*reinterpret_cast<const int16_t*>(value));
-      break;
-    case TYPE_INT:
-      out_.WriteZInt(*reinterpret_cast<const int32_t*>(value));
-      break;
-    case TYPE_BIGINT:
-      out_.WriteZLong(*reinterpret_cast<const int64_t*>(value));
-      break;
-    case TYPE_FLOAT:
-      out_.WriteBytes(4, reinterpret_cast<const char*>(value));
-      break;
-    case TYPE_DOUBLE:
-      out_.WriteBytes(8, reinterpret_cast<const char*>(value));
-      break;
-    case TYPE_STRING: {
-      const StringValue& sv = *reinterpret_cast<const StringValue*>(value);
-      out_.WriteZLong(sv.len);
-      out_.WriteBytes(sv.len, sv.ptr);
-      break;
-    }
-    case TYPE_DECIMAL: {
-      int byte_size = ColumnType::GetDecimalByteSize(type.precision);
-      out_.WriteZLong(byte_size);
-#if __BYTE_ORDER == __LITTLE_ENDIAN
-      char tmp[16];
-      BitUtil::ByteSwap(tmp, value, byte_size);
-      out_.WriteBytes(byte_size, tmp);
-#else
-      out_.WriteBytes(byte_size, reinterpret_cast<const char*>(value));
-#endif
-      break;
-    }
-    case TYPE_TIMESTAMP:
-    case TYPE_BINARY:
-    case INVALID_TYPE:
-    case TYPE_NULL:
-    case TYPE_DATE:
-    case TYPE_DATETIME:
-    default:
-      DCHECK(false);
-  }
-}
-
-Status HdfsAvroTableWriter::Init() {
-  // create the Sync marker
-  sync_marker_ = GenerateUUIDString();
-
-  THdfsCompression::type codec = AVRO_DEFAULT_CODEC;
-  if (state_->query_options().__isset.compression_codec) {
-    codec = state_->query_options().compression_codec;
-  }
-
-  // sets codec_name_ and compressor_
-  codec_type_ = codec;
-  switch (codec) {
-    case THdfsCompression::SNAPPY:
-      codec_name_ = "snappy";
-      break;
-    case THdfsCompression::DEFLATE:
-      codec_name_ = "deflate";
-      break;
-    case THdfsCompression::NONE:
-      codec_name_ = "null";
-      return Status::OK();
-    default:
-      const char* name = _THdfsCompression_VALUES_TO_NAMES.find(codec)->second;
-      return Status(Substitute(
-          "Avro only supports NONE, DEFLATE, and SNAPPY codecs; unsupported codec $0",
-          name));
-  }
-  RETURN_IF_ERROR(Codec::CreateCompressor(mem_pool_.get(), true, codec, &compressor_));
-  DCHECK(compressor_.get() != NULL);
-
-  return Status::OK();
-}
-
-void HdfsAvroTableWriter::Close() {
-  mem_pool_->FreeAll();
-}
-
-Status HdfsAvroTableWriter::AppendRows(
-    RowBatch* batch, const vector<int32_t>& row_group_indices, bool* new_file) {
-  int32_t limit;
-  bool all_rows = row_group_indices.empty();
-  if (all_rows) {
-    limit = batch->num_rows();
-  } else {
-    limit = row_group_indices.size();
-  }
-  COUNTER_ADD(parent_->rows_inserted_counter(), limit);
-
-  {
-    SCOPED_TIMER(parent_->encode_timer());
-    for (int row_idx = 0; row_idx < limit; ++row_idx) {
-      TupleRow* row = all_rows ?
-          batch->GetRow(row_idx) : batch->GetRow(row_group_indices[row_idx]);
-      ConsumeRow(row);
-    }
-  }
-
-  if (out_.Size() > DEFAULT_AVRO_BLOCK_SIZE) RETURN_IF_ERROR(Flush());
-  *new_file = false;
-  return Status::OK();
-}
-
-Status HdfsAvroTableWriter::WriteFileHeader() {
-  out_.Clear();
-  out_.WriteBytes(4, reinterpret_cast<const uint8_t*>(OBJ1));
-
-  // Write 'File Metadata' as an encoded avro map
-  // number of key/value pairs in the map
-  out_.WriteZLong(2);
-
-  // Schema information
-  out_.WriteZLong(strlen(AVRO_SCHEMA_STR));
-  out_.WriteBytes(strlen(AVRO_SCHEMA_STR), AVRO_SCHEMA_STR);
-  const string& avro_schema = table_desc_->avro_schema();
-  out_.WriteZLong(avro_schema.size());
-  out_.WriteBytes(avro_schema.size(), avro_schema.data());
-
-  // codec information
-  out_.WriteZLong(strlen(AVRO_CODEC_STR));
-  out_.WriteBytes(strlen(AVRO_CODEC_STR), AVRO_CODEC_STR);
-  out_.WriteZLong(codec_name_.size());
-  out_.WriteBytes(codec_name_.size(), codec_name_.data());
-
-  // Write end of map marker
-  out_.WriteZLong(0);
-
-  out_.WriteBytes(sync_marker_.size(), sync_marker_.data());
-
-  const string& text = out_.String();
-  RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(text.c_str()),
-                        text.size()));
-  out_.Clear();
-  return Status::OK();
-}
-
-Status HdfsAvroTableWriter::Flush() {
-  if (unflushed_rows_ == 0) return Status::OK();
-
-  WriteStream header;
-  // 1. Count of objects in this block
-  header.WriteZLong(unflushed_rows_);
-
-  const uint8_t* output;
-  int64_t output_length;
-  // Snappy format requires a CRC after the compressed data
-  uint32_t crc;
-  const string& text = out_.String();
-
-  if (codec_type_ != THdfsCompression::NONE) {
-    SCOPED_TIMER(parent_->compress_timer());
-    uint8_t* temp;
-    RETURN_IF_ERROR(compressor_->ProcessBlock(false, text.size(),
-        reinterpret_cast<const uint8_t*>(text.data()), &output_length, &temp));
-    output = temp;
-    if (codec_type_ == THdfsCompression::SNAPPY) {
-      crc = SnappyCompressor::ComputeChecksum(
-          text.size(), reinterpret_cast<const uint8_t*>(text.data()));
-    }
-  } else {
-    output = reinterpret_cast<const uint8_t*>(text.data());
-    output_length = out_.Size();
-  }
-
-  // 2. length of serialized objects
-  if (codec_type_ == THdfsCompression::SNAPPY) {
-    // + 4 for the CRC checksum at the end of the compressed block
-    header.WriteZLong(output_length + 4);
-  } else {
-    header.WriteZLong(output_length);
-  }
-
-  const string& head = header.String();
-  {
-    SCOPED_TIMER(parent_->hdfs_write_timer());
-    // Flush (1) and (2) to HDFS
-    RETURN_IF_ERROR(
-        Write(reinterpret_cast<const uint8_t*>(head.data()), head.size()));
-    // 3. serialized objects
-    RETURN_IF_ERROR(Write(output, output_length));
-
-    // Write CRC checksum
-    if (codec_type_ == THdfsCompression::SNAPPY) {
-      RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(&crc), sizeof(uint32_t)));
-    }
-  }
-
-  // 4. sync marker
-  RETURN_IF_ERROR(
-      Write(reinterpret_cast<const uint8_t*>(sync_marker_.data()), sync_marker_.size()));
-
-  out_.Clear();
-  unflushed_rows_ = 0;
-  return Status::OK();
-}

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/hdfs-avro-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-table-writer.h b/be/src/exec/hdfs-avro-table-writer.h
deleted file mode 100644
index 6966860..0000000
--- a/be/src/exec/hdfs-avro-table-writer.h
+++ /dev/null
@@ -1,121 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_EXEC_HDFS_AVRO_WRITER_H
-#define IMPALA_EXEC_HDFS_AVRO_WRITER_H
-
-#include <hdfs.h>
-#include <sstream>
-#include <string>
-
-#include "common/status.h"
-#include "exec/hdfs-table-writer.h"
-#include "runtime/mem-pool.h"
-#include "util/codec.h"
-#include "exec/write-stream.h"
-
-namespace impala {
-
-struct ColumnType;
-class HdfsTableSink;
-class RuntimeState;
-class ScalarExprEvaluator;
-class TupleDescriptor;
-class TupleRow;
-struct OutputPartition;
-struct StringValue;
-
-/// Consumes rows and outputs the rows into an Avro file in HDFS
-/// Each Avro file contains a block of records (rows). The file metadata specifies the
-/// schema of the records in addition to the name of the codec, if any, used to compress
-/// blocks. The structure is:
-///   [ Metadata ]
-///   [ Sync Marker ]
-///   [ Data Block ]
-///     ...
-///   [ Data Block ]
-//
-/// Each Data Block consists of:
-///   [ Number of Rows in Block ]
-///   [ Size of serialized objects, after compression ]
-///   [ Serialized objects, compressed ]
-///   [ Sync Marker ]
-//
-/// If compression is used, each block is compressed individually. The block size defaults
-/// to about 64KB before compression.
-/// This writer implements the Avro 1.7.7 spec:
-/// http://avro.apache.org/docs/1.7.7/spec.html
-class HdfsAvroTableWriter : public HdfsTableWriter {
- public:
-  HdfsAvroTableWriter(HdfsTableSink* parent,
-                      RuntimeState* state, OutputPartition* output,
-                      const HdfsPartitionDescriptor* partition,
-                      const HdfsTableDescriptor* table_desc);
-
-  virtual ~HdfsAvroTableWriter() { }
-
-  virtual Status Init() override;
-  virtual Status Finalize() override { return Flush(); }
-  virtual Status InitNewFile() override { return WriteFileHeader(); }
-  virtual void Close() override;
-  virtual uint64_t default_block_size() const override { return 0; }
-  virtual std::string file_extension() const override { return "avro"; }
-
-  /// Outputs the given rows into an HDFS sequence file. The rows are buffered
-  /// to fill a sequence file block.
-  virtual Status AppendRows(RowBatch* rows,
-      const std::vector<int32_t>& row_group_indices, bool* new_file) override;
-
- private:
-  /// Processes a single row, appending to out_
-  void ConsumeRow(TupleRow* row);
-
-  /// Adds an encoded field to out_
-  inline void AppendField(const ColumnType& type, const void* value);
-
-  /// Writes the Avro file header to HDFS
-  Status WriteFileHeader() WARN_UNUSED_RESULT;
-
-  /// Writes the contents of out_ to HDFS as a single Avro file block.
-  /// Returns an error if write to HDFS fails.
-  Status Flush() WARN_UNUSED_RESULT;
-
-  /// Buffer which holds accumulated output
-  WriteStream out_;
-
-  /// Memory pool used by codec to allocate output buffer.
-  /// Owned by this class. Initialized using parent's memtracker.
-  boost::scoped_ptr<MemPool> mem_pool_;
-
-  /// Number of rows consumed since last flush
-  uint64_t unflushed_rows_;
-
-  /// Name of codec, only set if codec_type_ != NONE
-  std::string codec_name_;
-
-  /// Type of the codec, will be NONE if no compression is used
-  THdfsCompression::type codec_type_;
-
-  /// The codec for compressing, only set if codec_type_ != NONE
-  boost::scoped_ptr<Codec> compressor_;
-
-  /// 16 byte sync marker (a uuid)
-  std::string sync_marker_;
-};
-
-} // namespace impala
-#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/hdfs-sequence-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-table-writer.cc b/be/src/exec/hdfs-sequence-table-writer.cc
deleted file mode 100644
index 42a70f0..0000000
--- a/be/src/exec/hdfs-sequence-table-writer.cc
+++ /dev/null
@@ -1,361 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/hdfs-sequence-table-writer.h"
-#include "exec/write-stream.inline.h"
-#include "exec/exec-node.h"
-#include "util/hdfs-util.h"
-#include "util/uid-util.h"
-#include "exprs/scalar-expr.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/raw-value.h"
-#include "runtime/row-batch.h"
-#include "runtime/runtime-state.h"
-#include "runtime/hdfs-fs-cache.h"
-#include "util/runtime-profile-counters.h"
-
-#include <vector>
-#include <hdfs.h>
-#include <boost/scoped_ptr.hpp>
-#include <stdlib.h>
-
-#include "common/names.h"
-
-namespace impala {
-
-const uint8_t HdfsSequenceTableWriter::SEQ6_CODE[4] = {'S', 'E', 'Q', 6};
-const char* HdfsSequenceTableWriter::VALUE_CLASS_NAME = "org.apache.hadoop.io.Text";
-const char* HdfsSequenceTableWriter::KEY_CLASS_NAME =
-    "org.apache.hadoop.io.BytesWritable";
-
-HdfsSequenceTableWriter::HdfsSequenceTableWriter(HdfsTableSink* parent,
-    RuntimeState* state, OutputPartition* output,
-    const HdfsPartitionDescriptor* partition, const HdfsTableDescriptor* table_desc)
-  : HdfsTableWriter(parent, state, output, partition, table_desc),
-    mem_pool_(new MemPool(parent->mem_tracker())), compress_flag_(false),
-    unflushed_rows_(0), record_compression_(false) {
-  approx_block_size_ = 64 * 1024 * 1024;
-  parent->mem_tracker()->Consume(approx_block_size_);
-  field_delim_ = partition->field_delim();
-  escape_char_ = partition->escape_char();
-}
-
-Status HdfsSequenceTableWriter::Init() {
-  THdfsCompression::type codec = THdfsCompression::SNAPPY_BLOCKED;
-  const TQueryOptions& query_options = state_->query_options();
-  if (query_options.__isset.compression_codec) {
-    codec = query_options.compression_codec;
-    if (codec == THdfsCompression::SNAPPY) {
-      // Seq file (and in general things that use hadoop.io.codec) always
-      // mean snappy_blocked.
-      codec = THdfsCompression::SNAPPY_BLOCKED;
-    }
-  }
-  if (codec != THdfsCompression::NONE) {
-    compress_flag_ = true;
-    if (query_options.__isset.seq_compression_mode) {
-      record_compression_ =
-          query_options.seq_compression_mode == THdfsSeqCompressionMode::RECORD;
-    }
-    RETURN_IF_ERROR(Codec::GetHadoopCodecClassName(codec, &codec_name_));
-    RETURN_IF_ERROR(Codec::CreateCompressor(
-        mem_pool_.get(), true, codec_name_, &compressor_));
-    DCHECK(compressor_.get() != NULL);
-  }
-
-  // create the Sync marker
-  string uuid = GenerateUUIDString();
-  uint8_t sync_neg1[20];
-
-  ReadWriteUtil::PutInt(sync_neg1, static_cast<uint32_t>(-1));
-  DCHECK(uuid.size() == 16);
-  memcpy(sync_neg1 + sizeof(int32_t), uuid.data(), uuid.size());
-  neg1_sync_marker_ = string(reinterpret_cast<char*>(sync_neg1), 20);
-  sync_marker_ = uuid;
-
-  return Status::OK();
-}
-
-Status HdfsSequenceTableWriter::AppendRows(
-    RowBatch* batch, const vector<int32_t>& row_group_indices, bool* new_file) {
-  int32_t limit;
-  if (row_group_indices.empty()) {
-    limit = batch->num_rows();
-  } else {
-    limit = row_group_indices.size();
-  }
-  COUNTER_ADD(parent_->rows_inserted_counter(), limit);
-
-  bool all_rows = row_group_indices.empty();
-  int num_non_partition_cols =
-      table_desc_->num_cols() - table_desc_->num_clustering_cols();
-  DCHECK_GE(output_expr_evals_.size(), num_non_partition_cols)
-      << parent_->DebugString();
-
-  {
-    SCOPED_TIMER(parent_->encode_timer());
-    if (all_rows) {
-      for (int row_idx = 0; row_idx < limit; ++row_idx) {
-        RETURN_IF_ERROR(ConsumeRow(batch->GetRow(row_idx)));
-      }
-    } else {
-      for (int row_idx = 0; row_idx < limit; ++row_idx) {
-        TupleRow* row = batch->GetRow(row_group_indices[row_idx]);
-        RETURN_IF_ERROR(ConsumeRow(row));
-      }
-    }
-  }
-
-  if (!compress_flag_) {
-    out_.WriteBytes(neg1_sync_marker_.size(), neg1_sync_marker_.data());
-  }
-
-  if (out_.Size() >= approx_block_size_) RETURN_IF_ERROR(Flush());
-  *new_file = false;
-  return Status::OK();
-}
-
-Status HdfsSequenceTableWriter::WriteFileHeader() {
-  out_.WriteBytes(sizeof(SEQ6_CODE), SEQ6_CODE);
-
-  // Setup to be correct key class
-  out_.WriteText(strlen(KEY_CLASS_NAME),
-      reinterpret_cast<const uint8_t*>(KEY_CLASS_NAME));
-
-  // Setup to be correct value class
-  out_.WriteText(strlen(VALUE_CLASS_NAME),
-      reinterpret_cast<const uint8_t*>(VALUE_CLASS_NAME));
-
-  // Flag for if compression is used
-  out_.WriteBoolean(compress_flag_);
-  // Only valid if compression is used. Indicates if block compression is used.
-  out_.WriteBoolean(compress_flag_ && !record_compression_);
-
-  // Output the name of our compression codec, parsed by readers
-  if (compress_flag_) {
-    out_.WriteText(codec_name_.size(),
-        reinterpret_cast<const uint8_t*>(codec_name_.data()));
-  }
-
-  // Meta data is formated as an integer N followed by N*2 strings,
-  // which are key-value pairs. Hive does not write meta data, so neither does Impala
-  out_.WriteInt(0);
-
-  // write the sync marker
-  out_.WriteBytes(sync_marker_.size(), sync_marker_.data());
-
-  string text = out_.String();
-  RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(text.c_str()), text.size()));
-  out_.Clear();
-  return Status::OK();
-}
-
-Status HdfsSequenceTableWriter::WriteCompressedBlock() {
-  WriteStream record;
-  uint8_t *output;
-  int64_t output_length;
-  DCHECK(compress_flag_);
-
-  // Add a sync marker to start of the block
-  record.WriteBytes(neg1_sync_marker_.size(), neg1_sync_marker_.data());
-
-  // Output the number of rows in this block
-  record.WriteVLong(unflushed_rows_);
-
-  // Output compressed key-lengths block-size & compressed key-lengths block.
-  // The key-lengths block contains byte value of 4 as a key length for each row (this is
-  // what Hive does).
-  string key_lengths_text(unflushed_rows_, '\x04');
-  {
-    SCOPED_TIMER(parent_->compress_timer());
-    RETURN_IF_ERROR(compressor_->ProcessBlock(false, key_lengths_text.size(),
-        reinterpret_cast<const uint8_t*>(key_lengths_text.data()), &output_length,
-        &output));
-  }
-  record.WriteVInt(output_length);
-  record.WriteBytes(output_length, output);
-
-  // Output compressed keys block-size & compressed keys block.
-  // The keys block contains "\0\0\0\0" byte sequence as a key for each row (this is what
-  // Hive does).
-  string keys_text(unflushed_rows_ * 4, '\0');
-  {
-    SCOPED_TIMER(parent_->compress_timer());
-    RETURN_IF_ERROR(compressor_->ProcessBlock(false, keys_text.size(),
-        reinterpret_cast<const uint8_t*>(keys_text.data()), &output_length, &output));
-  }
-  record.WriteVInt(output_length);
-  record.WriteBytes(output_length, output);
-
-  // Output compressed value-lengths block-size & compressed value-lengths block
-  string value_lengths_text = out_value_lengths_block_.String();
-  {
-    SCOPED_TIMER(parent_->compress_timer());
-    RETURN_IF_ERROR(compressor_->ProcessBlock(false, value_lengths_text.size(),
-        reinterpret_cast<const uint8_t*>(value_lengths_text.data()), &output_length, &output));
-  }
-  record.WriteVInt(output_length);
-  record.WriteBytes(output_length, output);
-
-  // Output compressed values block-size & compressed values block
-  string text = out_.String();
-  {
-    SCOPED_TIMER(parent_->compress_timer());
-    RETURN_IF_ERROR(compressor_->ProcessBlock(false, text.size(),
-        reinterpret_cast<const uint8_t*>(text.data()), &output_length, &output));
-  }
-  record.WriteVInt(output_length);
-  record.WriteBytes(output_length, output);
-
-  string rec = record.String();
-  RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(rec.data()), rec.size()));
-  return Status::OK();
-}
-
-inline void HdfsSequenceTableWriter::WriteEscapedString(const StringValue* str_val,
-                                                       WriteStream* buf) {
-  for (int i = 0; i < str_val->len; ++i) {
-    if (str_val->ptr[i] == field_delim_ || str_val->ptr[i] == escape_char_) {
-      buf->WriteByte(escape_char_);
-    }
-    buf->WriteByte(str_val->ptr[i]);
-  }
-}
-
-void HdfsSequenceTableWriter::EncodeRow(TupleRow* row, WriteStream* buf) {
-  // TODO Unify with text table writer
-  int num_non_partition_cols =
-      table_desc_->num_cols() - table_desc_->num_clustering_cols();
-  DCHECK_GE(output_expr_evals_.size(), num_non_partition_cols)
-      << parent_->DebugString();
-  for (int j = 0; j < num_non_partition_cols; ++j) {
-    void* value = output_expr_evals_[j]->GetValue(row);
-    if (value != NULL) {
-      if (output_expr_evals_[j]->root().type().type == TYPE_STRING) {
-        WriteEscapedString(reinterpret_cast<const StringValue*>(value), &row_buf_);
-      } else {
-        string str;
-        output_expr_evals_[j]->PrintValue(value, &str);
-        buf->WriteBytes(str.size(), str.data());
-      }
-    } else {
-      // NULLs in hive are encoded based on the 'serialization.null.format' property.
-      const string& null_val = table_desc_->null_column_value();
-      buf->WriteBytes(null_val.size(), null_val.data());
-    }
-    // Append field delimiter.
-    if (j + 1 < num_non_partition_cols) {
-      buf->WriteByte(field_delim_);
-    }
-  }
-}
-
-inline Status HdfsSequenceTableWriter::ConsumeRow(TupleRow* row) {
-  ++unflushed_rows_;
-  row_buf_.Clear();
-  if (compress_flag_ && !record_compression_) {
-    // Output row for a block compressed sequence file.
-    // Value block: Write the length as a vlong and then write the contents.
-    EncodeRow(row, &row_buf_);
-    out_.WriteVLong(row_buf_.Size());
-    out_.WriteBytes(row_buf_.Size(), row_buf_.String().data());
-    // Value-lengths block: Write the number of bytes we have just written to out_ as
-    // vlong
-    out_value_lengths_block_.WriteVLong(
-        ReadWriteUtil::VLongRequiredBytes(row_buf_.Size()) + row_buf_.Size());
-    return Status::OK();
-  }
-
-  EncodeRow(row, &row_buf_);
-
-  const uint8_t* value_bytes;
-  int64_t value_length;
-  string text = row_buf_.String();
-  if (compress_flag_) {
-    // apply compression to row_buf_
-    // the length of the buffer must be prefixed to the buffer prior to compression
-    //
-    // TODO this incurs copy overhead to place the length in front of the
-    // buffer prior to compression. We may want to rewrite to avoid copying.
-    row_buf_.Clear();
-    // encoding as "Text" writes the length before the text
-    row_buf_.WriteText(text.size(), reinterpret_cast<const uint8_t*>(&text.data()[0]));
-    text = row_buf_.String();
-    uint8_t *tmp;
-    {
-      SCOPED_TIMER(parent_->compress_timer());
-      RETURN_IF_ERROR(compressor_->ProcessBlock(false, text.size(),
-          reinterpret_cast<const uint8_t*>(text.data()), &value_length, &tmp));
-    }
-    value_bytes = tmp;
-  } else {
-    value_length = text.size();
-    DCHECK_EQ(value_length, row_buf_.Size());
-    value_bytes = reinterpret_cast<const uint8_t*>(text.data());
-  }
-
-  int rec_len = value_length;
-  // if the record is compressed, the length is part of the compressed text
-  // if not, then we need to write the length (below) and account for it's size
-  if (!compress_flag_) {
-    rec_len += ReadWriteUtil::VLongRequiredBytes(value_length);
-  }
-  // The record contains the key, account for it's size (we use "\0\0\0\0" byte sequence
-  // as a key just like Hive).
-  rec_len += 4;
-
-  // Length of the record (incl. key and value length)
-  out_.WriteInt(rec_len);
-
-  // Write length of the key and the key
-  out_.WriteInt(4);
-  out_.WriteBytes(4, "\0\0\0\0");
-
-  // if the record is compressed, the length is part of the compressed text
-  if (!compress_flag_) out_.WriteVLong(value_length);
-
-  // write out the value (possibly compressed)
-  out_.WriteBytes(value_length, value_bytes);
-  return Status::OK();
-}
-
-Status HdfsSequenceTableWriter::Flush() {
-  if (unflushed_rows_ == 0) return Status::OK();
-
-  SCOPED_TIMER(parent_->hdfs_write_timer());
-
-  if (compress_flag_ && !record_compression_) {
-    RETURN_IF_ERROR(WriteCompressedBlock());
-  } else {
-    string out_str = out_.String();
-    RETURN_IF_ERROR(
-        Write(reinterpret_cast<const uint8_t*>(out_str.data()), out_str.size()));
-  }
-  out_.Clear();
-  out_value_lengths_block_.Clear();
-  unflushed_rows_ = 0;
-  return Status::OK();
-}
-
-void HdfsSequenceTableWriter::Close() {
-  // TODO: double check there is no memory leak.
-  parent_->mem_tracker()->Release(approx_block_size_);
-  mem_pool_->FreeAll();
-}
-
-} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/hdfs-sequence-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-table-writer.h b/be/src/exec/hdfs-sequence-table-writer.h
deleted file mode 100644
index f315920..0000000
--- a/be/src/exec/hdfs-sequence-table-writer.h
+++ /dev/null
@@ -1,194 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_EXEC_HDFS_SEQUENCE_WRITER_H
-#define IMPALA_EXEC_HDFS_SEQUENCE_WRITER_H
-
-#include <hdfs.h>
-#include <sstream>
-
-#include "runtime/descriptors.h"
-#include "exec/hdfs-table-sink.h"
-#include "exec/hdfs-table-writer.h"
-#include "util/codec.h"
-#include "write-stream.h"
-
-namespace impala {
-
-class Expr;
-class TupleDescriptor;
-class TupleRow;
-class RuntimeState;
-struct StringValue;
-struct OutputPartition;
-
-/// Sequence files are flat files consisting of binary key/value pairs. Essentially there
-/// are 3 different formats for sequence files depending on the 'compression_codec' and
-/// 'seq_compression_mode' query options:
-/// - Uncompressed sequence file format
-/// - Record-compressed sequence file format
-/// - Block-compressed sequence file format
-/// All of them share a common header described below.
-///
-/// Sequence File Header
-/// --------------------
-/// - version - 3 bytes of magic header SEQ, followed by 1 byte of actual version number
-///   (e.g. SEQ4 or SEQ6)
-/// - keyClassName - key class
-/// - valueClassName - value class
-/// - compression - A boolean which specifies if compression is turned on for keys/values
-///   in this file.
-/// - blockCompression - A boolean which specifies if block-compression is turned on for
-///   keys/values in this file.
-/// - compression codec - compression codec class which is used for compression of keys
-///   and/or values (if compression is enabled).
-/// - metadata - SequenceFile.Metadata for this file.
-/// - sync - A 16 byte sync marker to denote end of the header.
-///
-/// Uncompressed Sequence File Format
-/// ---------------------------------
-/// - Header
-/// - Record
-///   - Record length
-///   - Key length
-///   - Key
-///   - Value
-/// - "\xFF\xFF\xFF\xFF" followed by a sync-marker every few 100 bytes or so.
-///
-/// Record-Compressed Sequence File Format
-/// --------------------------------------
-/// - Header
-/// - Record
-///   - Record length
-///   - Key length
-///   - Key
-///   - Compressed Value
-/// - "\xFF\xFF\xFF\xFF" followed by a sync-marker every few 100 bytes or so.
-///
-/// Block-Compressed Sequence File Format
-/// -------------------------------------
-/// - Header
-/// - Record Block
-///   - Uncompressed number of records in the block
-///   - Compressed key-lengths block-size
-///   - Compressed key-lengths block
-///   - Compressed keys block-size
-///   - Compressed keys block
-///   - Compressed value-lengths block-size
-///   - Compressed value-lengths block
-///   - Compressed values block-size
-///   - Compressed values block
-/// - "\xFF\xFF\xFF\xFF" followed by a sync-marker every block.
-/// The compressed blocks of key lengths and value lengths consist of the actual lengths
-/// of individual keys/values encoded in zero-compressed integer format.
-
-/// Consumes rows and outputs the rows into a sequence file in HDFS
-/// Output is buffered to fill sequence file blocks.
-class HdfsSequenceTableWriter : public HdfsTableWriter {
- public:
-  HdfsSequenceTableWriter(HdfsTableSink* parent, RuntimeState* state,
-      OutputPartition* output, const HdfsPartitionDescriptor* partition,
-      const HdfsTableDescriptor* table_desc);
-
-  ~HdfsSequenceTableWriter() { }
-
-  virtual Status Init();
-  virtual Status Finalize() { return Flush(); }
-  virtual Status InitNewFile() { return WriteFileHeader(); }
-  virtual void Close();
-  virtual uint64_t default_block_size() const { return 0; }
-  virtual std::string file_extension() const { return "seq"; }
-
-  /// Outputs the given rows into an HDFS sequence file. The rows are buffered
-  /// to fill a sequence file block.
-  virtual Status AppendRows(
-      RowBatch* rows, const std::vector<int32_t>& row_group_indices, bool* new_file);
-
- private:
-  /// processes a single row, delegates to Compress or NoCompress ConsumeRow().
-  inline Status ConsumeRow(TupleRow* row);
-
-  /// writes the SEQ file header to HDFS
-  Status WriteFileHeader();
-
-  /// writes the contents of out_value_lengths_block_ and out_ as a single
-  /// block-compressed record.
-  Status WriteCompressedBlock();
-
-  /// writes the tuple row to the given buffer; separates fields by field_delim_,
-  /// escapes string.
-  inline void EncodeRow(TupleRow* row, WriteStream* buf);
-
-  /// writes the str_val to the buffer, escaping special characters
-  inline void WriteEscapedString(const StringValue* str_val, WriteStream* buf);
-
-  /// flushes the output -- clearing out_ and writing to HDFS
-  /// if compress_flag_, will write contents of out_ as a single compressed block
-  Status Flush();
-
-  /// desired size of each block (bytes); actual block size will vary +/- the
-  /// size of a row; this is before compression is applied.
-  uint64_t approx_block_size_;
-
-  /// buffer which holds accumulated output
-  WriteStream out_;
-
-  /// buffer which holds accumulated value-lengths output (used with block-compressed
-  /// sequence files)
-  WriteStream out_value_lengths_block_;
-
-  /// Temporary Buffer for a single row
-  WriteStream row_buf_;
-
-  /// memory pool used by codec to allocate output buffer
-  boost::scoped_ptr<MemPool> mem_pool_;
-
-  /// true if compression is enabled
-  bool compress_flag_;
-
-  /// number of rows consumed since last flush
-  uint64_t unflushed_rows_;
-
-  /// name of codec, only set if compress_flag_
-  std::string codec_name_;
-  /// the codec for compressing, only set if compress_flag_
-  boost::scoped_ptr<Codec> compressor_;
-
-  /// true if compression is applied on each record individually
-  bool record_compression_;
-
-  /// Character delimiting fields
-  char field_delim_;
-
-  /// Escape character for text encoding
-  char escape_char_;
-
-  /// 16 byte sync marker (a uuid)
-  std::string sync_marker_;
-  /// A -1 infront of the sync marker, used in decompressed formats
-  std::string neg1_sync_marker_;
-
-  /// Name of java class to use when reading the keys
-  static const char* KEY_CLASS_NAME;
-  /// Name of java class to use when reading the values
-  static const char* VALUE_CLASS_NAME;
-  /// Magic characters used to identify the file type
-  static const uint8_t SEQ6_CODE[4];
-};
-
-} // namespace impala
-#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index b6de7cf..9c46638 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -18,8 +18,6 @@
 #include "exec/hdfs-table-sink.h"
 #include "exec/hdfs-table-writer.h"
 #include "exec/hdfs-text-table-writer.h"
-#include "exec/hdfs-sequence-table-writer.h"
-#include "exec/hdfs-avro-table-writer.h"
 #include "exec/hdfs-parquet-table-writer.h"
 #include "exec/exec-node.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
@@ -469,28 +467,20 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* state,
 
   output_partition->partition_descriptor = &partition_descriptor;
 
-  bool allow_unsupported_formats =
-      state->query_options().__isset.allow_unsupported_formats &&
-      state->query_options().allow_unsupported_formats;
-  if (!allow_unsupported_formats) {
-    if (partition_descriptor.file_format() == THdfsFileFormat::SEQUENCE_FILE ||
-        partition_descriptor.file_format() == THdfsFileFormat::AVRO) {
-      stringstream error_msg;
-      map<int, const char*>::const_iterator i =
-          _THdfsFileFormat_VALUES_TO_NAMES.find(partition_descriptor.file_format());
-      error_msg << "Writing to table format " << i->second
-          << " is not supported. Use query option ALLOW_UNSUPPORTED_FORMATS"
-          " to override.";
-      return Status(error_msg.str());
-    }
-    if (partition_descriptor.file_format() == THdfsFileFormat::TEXT &&
-        state->query_options().__isset.compression_codec &&
-        state->query_options().compression_codec != THdfsCompression::NONE) {
-      stringstream error_msg;
-      error_msg << "Writing to compressed text table is not supported. "
-          "Use query option ALLOW_UNSUPPORTED_FORMATS to override.";
-      return Status(error_msg.str());
-    }
+  if (partition_descriptor.file_format() == THdfsFileFormat::SEQUENCE_FILE ||
+      partition_descriptor.file_format() == THdfsFileFormat::AVRO) {
+    stringstream error_msg;
+    map<int, const char*>::const_iterator i =
+        _THdfsFileFormat_VALUES_TO_NAMES.find(partition_descriptor.file_format());
+    error_msg << "Writing to table format " << i->second << " is not supported.";
+    return Status(error_msg.str());
+  }
+  if (partition_descriptor.file_format() == THdfsFileFormat::TEXT &&
+      state->query_options().__isset.compression_codec &&
+      state->query_options().compression_codec != THdfsCompression::NONE) {
+    stringstream error_msg;
+    error_msg << "Writing to compressed text table is not supported. ";
+    return Status(error_msg.str());
   }
 
   // It is incorrect to initialize a writer if there are no rows to feed it. The writer
@@ -508,16 +498,6 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* state,
           new HdfsParquetTableWriter(
               this, state, output_partition, &partition_descriptor, table_desc_));
       break;
-    case THdfsFileFormat::SEQUENCE_FILE:
-      output_partition->writer.reset(
-          new HdfsSequenceTableWriter(
-              this, state, output_partition, &partition_descriptor, table_desc_));
-      break;
-    case THdfsFileFormat::AVRO:
-      output_partition->writer.reset(
-          new HdfsAvroTableWriter(
-              this, state, output_partition, &partition_descriptor, table_desc_));
-      break;
     default:
       stringstream error_msg;
       map<int, const char*>::const_iterator i =

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/hdfs-text-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-table-writer.cc b/be/src/exec/hdfs-text-table-writer.cc
index aaee773..f09b161 100644
--- a/be/src/exec/hdfs-text-table-writer.cc
+++ b/be/src/exec/hdfs-text-table-writer.cc
@@ -25,8 +25,6 @@
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/string-value.inline.h"
-#include "util/codec.h"
-#include "util/compress.h"
 #include "util/hdfs-util.h"
 #include "util/runtime-profile-counters.h"
 
@@ -35,13 +33,6 @@
 
 #include "common/names.h"
 
-// Hdfs block size for compressed text.
-static const int64_t COMPRESSED_BLOCK_SIZE = 64 * 1024 * 1024;
-
-// Size to buffer before compression. We want this to be less than the block size
-// (compressed text is not splittable).
-static const int64_t COMPRESSED_BUFFERED_SIZE = 60 * 1024 * 1024;
-
 namespace impala {
 
 HdfsTextTableWriter::HdfsTextTableWriter(HdfsTableSink* parent,
@@ -61,41 +52,17 @@ HdfsTextTableWriter::HdfsTextTableWriter(HdfsTableSink* parent,
 }
 
 Status HdfsTextTableWriter::Init() {
-  const TQueryOptions& query_options = state_->query_options();
-  codec_ = THdfsCompression::NONE;
-  if (query_options.__isset.compression_codec) {
-    codec_ = query_options.compression_codec;
-    if (codec_ == THdfsCompression::SNAPPY) {
-      // hadoop.io.codec always means SNAPPY_BLOCKED. Alias the two.
-      codec_ = THdfsCompression::SNAPPY_BLOCKED;
-    }
-  }
-
-  if (codec_ != THdfsCompression::NONE) {
-    mem_pool_.reset(new MemPool(parent_->mem_tracker()));
-    RETURN_IF_ERROR(Codec::CreateCompressor(
-        mem_pool_.get(), true, codec_, &compressor_));
-    flush_size_ = COMPRESSED_BUFFERED_SIZE;
-  } else {
-    flush_size_ = HDFS_FLUSH_WRITE_SIZE;
-  }
   parent_->mem_tracker()->Consume(flush_size_);
   return Status::OK();
 }
 
 void HdfsTextTableWriter::Close() {
   parent_->mem_tracker()->Release(flush_size_);
-  if (mem_pool_.get() != NULL) mem_pool_->FreeAll();
 }
 
-uint64_t HdfsTextTableWriter::default_block_size() const {
-  return compressor_.get() == NULL ? 0 : COMPRESSED_BLOCK_SIZE;
-}
+uint64_t HdfsTextTableWriter::default_block_size() const { return 0; }
 
-string HdfsTextTableWriter::file_extension() const {
-  if (compressor_.get() == NULL) return "";
-  return compressor_->file_extension();
-}
+string HdfsTextTableWriter::file_extension() const { return ""; }
 
 Status HdfsTextTableWriter::AppendRows(
     RowBatch* batch, const vector<int32_t>& row_group_indices, bool* new_file) {
@@ -152,12 +119,7 @@ Status HdfsTextTableWriter::AppendRows(
   }
 
   *new_file = false;
-  if (rowbatch_stringstream_.tellp() >= flush_size_) {
-    RETURN_IF_ERROR(Flush());
-
-    // If compressed, start a new file (compressed data is not splittable).
-    *new_file = compressor_.get() != NULL;
-  }
+  if (rowbatch_stringstream_.tellp() >= flush_size_) RETURN_IF_ERROR(Flush());
 
   return Status::OK();
 }
@@ -178,22 +140,9 @@ Status HdfsTextTableWriter::InitNewFile() {
 Status HdfsTextTableWriter::Flush() {
   string rowbatch_string = rowbatch_stringstream_.str();
   rowbatch_stringstream_.str(string());
-  const uint8_t* uncompressed_data =
+  const uint8_t* data =
       reinterpret_cast<const uint8_t*>(rowbatch_string.data());
-  int64_t uncompressed_len = rowbatch_string.size();
-  const uint8_t* data = uncompressed_data;
-  int64_t len = uncompressed_len;
-
-  if (compressor_.get() != NULL) {
-    SCOPED_TIMER(parent_->compress_timer());
-    uint8_t* compressed_data;
-    int64_t compressed_len;
-    RETURN_IF_ERROR(compressor_->ProcessBlock(false,
-        uncompressed_len, uncompressed_data,
-        &compressed_len, &compressed_data));
-    data = compressed_data;
-    len = compressed_len;
-  }
+  int64_t len = rowbatch_string.size();
 
   {
     SCOPED_TIMER(parent_->hdfs_write_timer());

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/exec/hdfs-text-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-table-writer.h b/be/src/exec/hdfs-text-table-writer.h
index 589ed23..e2f6135 100644
--- a/be/src/exec/hdfs-text-table-writer.h
+++ b/be/src/exec/hdfs-text-table-writer.h
@@ -87,15 +87,6 @@ class HdfsTextTableWriter : public HdfsTableWriter {
   /// Stringstream to buffer output.  The stream is cleared between HDFS
   /// Write calls to allow for the internal buffers to be reused.
   std::stringstream rowbatch_stringstream_;
-
-  /// Compression codec.
-  THdfsCompression::type codec_;
-
-  /// Compressor if compression is enabled.
-  boost::scoped_ptr<Codec> compressor_;
-
-  /// Memory pool to use with compressor_.
-  boost::scoped_ptr<MemPool> mem_pool_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/service/query-options-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index e5bc48d..b9bda60 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -208,8 +208,6 @@ TEST(QueryOptions, SetEnumOptions) {
       TParquetFallbackSchemaResolution, (POSITION, NAME)), true);
   TestEnumCase(options, CASE(parquet_array_resolution, TParquetArrayResolution,
       (THREE_LEVEL, TWO_LEVEL, TWO_LEVEL_THEN_THREE_LEVEL)), true);
-  TestEnumCase(options, CASE(seq_compression_mode, THdfsSeqCompressionMode,
-      (BLOCK, RECORD)), false);
   TestEnumCase(options, CASE(compression_codec, THdfsCompression,
       (NONE, GZIP, BZIP2, DEFAULT, SNAPPY, SNAPPY_BLOCKED)), false);
 #undef CASE

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 2e3415f..1063fef 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -226,25 +226,9 @@ Status impala::SetQueryOption(const string& key, const string& value,
       case TImpalaQueryOptions::NUM_SCANNER_THREADS:
         query_options->__set_num_scanner_threads(atoi(value.c_str()));
         break;
-      case TImpalaQueryOptions::ALLOW_UNSUPPORTED_FORMATS:
-        query_options->__set_allow_unsupported_formats(
-            iequals(value, "true") || iequals(value, "1"));
-        break;
       case TImpalaQueryOptions::DEBUG_ACTION:
         query_options->__set_debug_action(value.c_str());
         break;
-      case TImpalaQueryOptions::SEQ_COMPRESSION_MODE: {
-        if (iequals(value, "block")) {
-          query_options->__set_seq_compression_mode(THdfsSeqCompressionMode::BLOCK);
-        } else if (iequals(value, "record")) {
-          query_options->__set_seq_compression_mode(THdfsSeqCompressionMode::RECORD);
-        } else {
-          stringstream ss;
-          ss << "Invalid sequence file compression mode: " << value;
-          return Status(ss.str());
-        }
-        break;
-      }
       case TImpalaQueryOptions::COMPRESSION_CODEC: {
         if (iequals(value, "none")) {
           query_options->__set_compression_codec(THdfsCompression::NONE);

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index fce042c..01f6e74 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -44,8 +44,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
       TImpalaQueryOptions::ALLOW_ERASURE_CODED_FILES + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS,\
-      TQueryOptionLevel::DEPRECATED)\
+  REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
   QUERY_OPT_FN(batch_size, BATCH_SIZE, TQueryOptionLevel::DEVELOPMENT)\
   QUERY_OPT_FN(debug_action, DEBUG_ACTION, TQueryOptionLevel::DEVELOPMENT)\
   REMOVED_QUERY_OPT_FN(default_order_by_limit, DEFAULT_ORDER_BY_LIMIT)\
@@ -74,7 +73,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(buffer_pool_limit, BUFFER_POOL_LIMIT, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(appx_count_distinct, APPX_COUNT_DISTINCT, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(disable_unsafe_spills, DISABLE_UNSAFE_SPILLS, TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(seq_compression_mode, SEQ_COMPRESSION_MODE, TQueryOptionLevel::REGULAR)\
+  REMOVED_QUERY_OPT_FN(seq_compression_mode, SEQ_COMPRESSION_MODE)\
   QUERY_OPT_FN(exec_single_node_rows_threshold, EXEC_SINGLE_NODE_ROWS_THRESHOLD,\
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(optimize_partition_key_scans, OPTIMIZE_PARTITION_KEY_SCANS,\

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 6780138..120aebc 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -101,7 +101,6 @@ struct TQueryOptions {
   5: optional i32 num_nodes = NUM_NODES_ALL
   6: optional i64 max_scan_range_length = 0
   7: optional i32 num_scanner_threads = 0
-  9: optional bool allow_unsupported_formats = 0
   11: optional string debug_action = ""
   12: optional i64 mem_limit = 0
   14: optional CatalogObjects.THdfsCompression compression_codec
@@ -133,11 +132,6 @@ struct TQueryOptions {
   // has no plan hints, and at least one table is missing relevant stats.
   29: optional bool disable_unsafe_spills = 0
 
-  // Mode for compression; RECORD, or BLOCK
-  // This field only applies for certain file types and is ignored
-  // by all other file types.
-  30: optional CatalogObjects.THdfsSeqCompressionMode seq_compression_mode
-
   // If the number of rows that are processed for a single query is below the
   // threshold, it will be executed on the coordinator only with codegen disabled
   31: optional i32 exec_single_node_rows_threshold = 100

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 529af04..665144f 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -72,8 +72,7 @@ enum TImpalaQueryOptions {
   // Number of scanner threads.
   NUM_SCANNER_THREADS,
 
-  // If true, Impala will try to execute on file formats that are not fully supported yet
-  ALLOW_UNSUPPORTED_FORMATS,
+  ALLOW_UNSUPPORTED_FORMATS, // Removed
 
   DEFAULT_ORDER_BY_LIMIT, // Removed
 
@@ -110,8 +109,7 @@ enum TImpalaQueryOptions {
   // Leave blank to use default.
   COMPRESSION_CODEC,
 
-  // Mode for compressing sequence files; either BLOCK, RECORD, or DEFAULT
-  SEQ_COMPRESSION_MODE,
+  SEQ_COMPRESSION_MODE, // Removed
 
   // HBase scan query option. If set and > 0, HBASE_CACHING is the value for
   // "hbase.client.Scan.setCaching()" when querying HBase table. Otherwise, use backend

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 54ad57f..b671a1e 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -390,7 +390,6 @@ public class PlannerTestBase extends FrontendTestBase {
   protected TQueryOptions defaultQueryOptions() {
     TQueryOptions options = new TQueryOptions();
     options.setExplain_level(TExplainLevel.STANDARD);
-    options.setAllow_unsupported_formats(true);
     options.setExec_single_node_rows_threshold(0);
     return options;
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/testdata/bad_avro_snap/README
----------------------------------------------------------------------
diff --git a/testdata/bad_avro_snap/README b/testdata/bad_avro_snap/README
index 6271967..71eb398 100644
--- a/testdata/bad_avro_snap/README
+++ b/testdata/bad_avro_snap/README
@@ -1,6 +1,6 @@
 String Data
 -----------
-Created by modifying Impala's HdfsAvroTableWriter.
+Created by modifying Impala's HdfsAvroTableWriter(removed).
 
 These files' schemas have a single nullable string column 's'.
 
@@ -14,7 +14,7 @@ truncated_string.avro: contains one value, which is missing the last byte.
 
 Float Data
 ----------
-Created by modifying Impala's HdfsAvroTableWriter.
+Created by modifying Impala's HdfsAvroTableWriter(removed).
 
 These files' schemas have a single nullable float column 'c1'.
 

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/testdata/workloads/functional-query/queries/QueryTest/avro-writer.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/avro-writer.test b/testdata/workloads/functional-query/queries/QueryTest/avro-writer.test
deleted file mode 100644
index 6dc0899..0000000
--- a/testdata/workloads/functional-query/queries/QueryTest/avro-writer.test
+++ /dev/null
@@ -1,43 +0,0 @@
-====
----- QUERY
-drop table if exists __avro_write;
-====
----- QUERY
-SET COMPRESSION_CODEC=NONE;
-create table __avro_write (i int, s string, d double)
-stored as AVRO
-TBLPROPERTIES ('avro.schema.literal'='{
-                  "name": "my_record",
-                  "type": "record",
-                  "fields": [
-                  {"name":"i", "type":["int", "null"]},
-                  {"name":"s", "type":["string", "null"]},
-                  {"name":"d", "type":["double", "null"]}]}');
-====
----- QUERY
-SET COMPRESSION_CODEC=NONE;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __avro_write select 0, "a", 1.1;
-====
----- QUERY
-SET COMPRESSION_CODEC=SNAPPY;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __avro_write select 1, "b", 2.2;
-====
----- QUERY
-select * from __avro_write;
----- RESULTS
-0,'a',1.1
-1,'b',2.2
----- TYPES
-INT,STRING,DOUBLE
-====
----- QUERY
-SET ALLOW_UNSUPPORTED_FORMATS=0;
-insert into __avro_write select 1, "b", 2.2;
----- CATCH
-Writing to table format AVRO is not supported. Use query option ALLOW_UNSUPPORTED_FORMATS
-====
----- QUERY
-drop table __avro_write;
-====

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test b/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test
deleted file mode 100644
index 7e2363f..0000000
--- a/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test
+++ /dev/null
@@ -1,308 +0,0 @@
-====
----- QUERY
-SET COMPRESSION_CODEC=NONE;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-create table __seq_write (i int, s string, d double)
-stored as SEQUENCEFILE;
-====
----- QUERY
-SET COMPRESSION_CODEC=NONE;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __seq_write select 0, "a", 1.1;
-====
----- QUERY
-SET COMPRESSION_CODEC=DEFAULT;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __seq_write values (1, "b", 2.2);
-====
----- QUERY
-SET COMPRESSION_CODEC=SNAPPY;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __seq_write values (2, "c", 3.3);
-====
----- QUERY
-SET COMPRESSION_CODEC=SNAPPY_BLOCKED;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __seq_write values (3, "d", 4.4);
-====
----- QUERY
-SET COMPRESSION_CODEC=GZIP;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __seq_write values (4, "e", 5.5);
-====
----- QUERY
-SET COMPRESSION_CODEC=NONE;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __seq_write select 5, "a", 1.1;
-====
----- QUERY
-SET COMPRESSION_CODEC=DEFAULT;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __seq_write values (6, "b", 2.2);
-====
----- QUERY
-SET COMPRESSION_CODEC=SNAPPY;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __seq_write values (7, "c", 3.3);
-====
----- QUERY
-SET COMPRESSION_CODEC=SNAPPY_BLOCKED;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __seq_write values (8, "d", 4.4);
-====
----- QUERY
-SET COMPRESSION_CODEC=GZIP;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __seq_write values (9, "e", 5.5);
-====
----- QUERY
-SET ALLOW_UNSUPPORTED_FORMATS=0;
-insert into __seq_write values (4, "e", 5.5);
----- CATCH
-Writing to table format SEQUENCE_FILE is not supported. Use query option
-====
----- QUERY
-select * from __seq_write;
----- RESULTS
-0,'a',1.1
-1,'b',2.2
-2,'c',3.3
-3,'d',4.4
-4,'e',5.5
-5,'a',1.1
-6,'b',2.2
-7,'c',3.3
-8,'d',4.4
-9,'e',5.5
----- TYPES
-INT,STRING,DOUBLE
-====
----- QUERY
-# IMPALA-3079: Create a table containing larger seq files with NONE+RECORD and then read
-# it back
-SET COMPRESSION_CODEC=NONE;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table store_sales_seq_none_rec like tpcds_parquet.store_sales
-stored as SEQUENCEFILE;
-insert into store_sales_seq_none_rec partition(ss_sold_date_sk)
-select * from tpcds_parquet.store_sales
-where (ss_sold_date_sk between 2451175 and 2451200) or
-      (ss_sold_date_sk is null and ss_sold_time_sk > 60000);
-====
----- QUERY
-select count(*) from store_sales_seq_none_rec;
----- RESULTS
-60091
----- TYPES
-BIGINT
-====
----- QUERY
-# IMPALA-3079: Create a table containing larger seq files with DEFAULT+RECORD and then
-# read it back
-SET COMPRESSION_CODEC=DEFAULT;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table store_sales_seq_def_rec like tpcds_parquet.store_sales
-stored as SEQUENCEFILE;
-insert into store_sales_seq_def_rec partition(ss_sold_date_sk)
-select * from tpcds_parquet.store_sales
-where (ss_sold_date_sk between 2451175 and 2451200) or
-      (ss_sold_date_sk is null and ss_sold_time_sk > 60000);
-====
----- QUERY
-select count(*) from store_sales_seq_def_rec;
----- RESULTS
-60091
----- TYPES
-BIGINT
-====
----- QUERY
-# IMPALA-3079: Create a table containing larger seq files with SNAPPY_BLOCKED+RECORD and
-# then read it back
-SET COMPRESSION_CODEC=SNAPPY_BLOCKED;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table store_sales_seq_snapb_rec like tpcds_parquet.store_sales
-stored as SEQUENCEFILE;
-insert into store_sales_seq_snapb_rec partition(ss_sold_date_sk)
-select * from tpcds_parquet.store_sales
-where (ss_sold_date_sk between 2451175 and 2451200) or
-      (ss_sold_date_sk is null and ss_sold_time_sk > 60000);
-====
----- QUERY
-select count(*) from store_sales_seq_snapb_rec;
----- RESULTS
-60091
----- TYPES
-BIGINT
-====
----- QUERY
-# IMPALA-3079: Create a table containing larger seq files with SNAPPY+RECORD and then read
-# it back
-SET COMPRESSION_CODEC=SNAPPY;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table store_sales_seq_snap_rec like tpcds_parquet.store_sales
-stored as SEQUENCEFILE;
-insert into store_sales_seq_snap_rec partition(ss_sold_date_sk)
-select * from tpcds_parquet.store_sales
-where (ss_sold_date_sk between 2451175 and 2451200) or
-      (ss_sold_date_sk is null and ss_sold_time_sk > 60000);
-====
----- QUERY
-select count(*) from store_sales_seq_snap_rec;
----- RESULTS
-60091
----- TYPES
-BIGINT
-====
----- QUERY
-# IMPALA-3079: Create a table containing larger seq files with GZIP+RECORD and then read
-# it back
-SET COMPRESSION_CODEC=GZIP;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table store_sales_seq_gzip_rec like tpcds_parquet.store_sales
-stored as SEQUENCEFILE;
-insert into store_sales_seq_gzip_rec partition(ss_sold_date_sk)
-select * from tpcds_parquet.store_sales
-where (ss_sold_date_sk between 2451175 and 2451200) or
-      (ss_sold_date_sk is null and ss_sold_time_sk > 60000);
-====
----- QUERY
-select count(*) from store_sales_seq_gzip_rec;
----- RESULTS
-60091
----- TYPES
-BIGINT
-====
----- QUERY
-# IMPALA-3079: Create a table containing larger seq files with NONE+BLOCK and then read it
-# back
-SET COMPRESSION_CODEC=NONE;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table store_sales_seq_none_block like tpcds_parquet.store_sales
-stored as SEQUENCEFILE;
-insert into store_sales_seq_none_block partition(ss_sold_date_sk)
-select * from tpcds_parquet.store_sales
-where (ss_sold_date_sk between 2451175 and 2451200) or
-      (ss_sold_date_sk is null and ss_sold_time_sk > 60000);
-====
----- QUERY
-select count(*) from store_sales_seq_none_block;
----- RESULTS
-60091
----- TYPES
-BIGINT
-====
----- QUERY
-# IMPALA-3079: Create a table containing larger seq files with DEFAULT+BLOCK and then read
-# it back
-SET COMPRESSION_CODEC=DEFAULT;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table store_sales_seq_def_block like tpcds_parquet.store_sales
-stored as SEQUENCEFILE;
-insert into store_sales_seq_def_block partition(ss_sold_date_sk)
-select * from tpcds_parquet.store_sales
-where (ss_sold_date_sk between 2451175 and 2451200) or
-      (ss_sold_date_sk is null and ss_sold_time_sk > 60000);
-====
----- QUERY
-select count(*) from store_sales_seq_def_block;
----- RESULTS
-60091
----- TYPES
-BIGINT
-====
----- QUERY
-# IMPALA-3079: Create a table containing larger seq files with SNAPPY_BLOCKED+BLOCK and
-# then read it back
-SET COMPRESSION_CODEC=SNAPPY_BLOCKED;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table store_sales_seq_snapb_block like tpcds_parquet.store_sales
-stored as SEQUENCEFILE;
-insert into store_sales_seq_snapb_block partition(ss_sold_date_sk)
-select * from tpcds_parquet.store_sales
-where (ss_sold_date_sk between 2451175 and 2451200) or
-      (ss_sold_date_sk is null and ss_sold_time_sk > 60000);
-====
----- QUERY
-select count(*) from store_sales_seq_snapb_block;
----- RESULTS
-60091
----- TYPES
-BIGINT
-====
----- QUERY
-# IMPALA-3079: Create a table containing larger seq files with SNAPPY+BLOCK and then read
-# it back
-SET COMPRESSION_CODEC=SNAPPY;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table store_sales_seq_snap_block like tpcds_parquet.store_sales
-stored as SEQUENCEFILE;
-insert into store_sales_seq_snap_block partition(ss_sold_date_sk)
-select * from tpcds_parquet.store_sales
-where (ss_sold_date_sk between 2451175 and 2451200) or
-      (ss_sold_date_sk is null and ss_sold_time_sk > 60000);
-====
----- QUERY
-select count(*) from store_sales_seq_snap_block;
----- RESULTS
-60091
----- TYPES
-BIGINT
-====
----- QUERY
-# IMPALA-3079: Create a table containing larger seq files with GZIP+BLOCK and then read it
-# back
-SET COMPRESSION_CODEC=GZIP;
-SET SEQ_COMPRESSION_MODE=BLOCK;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table store_sales_seq_gzip_block like tpcds_parquet.store_sales
-stored as SEQUENCEFILE;
-insert into store_sales_seq_gzip_block partition(ss_sold_date_sk)
-select * from tpcds_parquet.store_sales
-where (ss_sold_date_sk between 2451175 and 2451200) or
-      (ss_sold_date_sk is null and ss_sold_time_sk > 60000);
-====
----- QUERY
-select count(*) from store_sales_seq_gzip_block;
----- RESULTS
-60091
----- TYPES
-BIGINT
-====
----- QUERY
-# IMPALA-5407: Create a table containing seq files with GZIP+RECORD. If the number of
-# impalad workers is three, three files will be created, two of which are large enough
-# (> 64MB) to force multiple flushes. Make sure that the files have been created
-# successfully.
-SET COMPRESSION_CODEC=GZIP;
-SET SEQ_COMPRESSION_MODE=RECORD;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-create table catalog_sales_seq_gzip_rec like tpcds.catalog_sales stored as SEQUENCEFILE;
-insert into catalog_sales_seq_gzip_rec select * from tpcds.catalog_sales;
-====
----- QUERY
-select count(*) from catalog_sales_seq_gzip_rec;
----- RESULTS
-1441548
----- TYPES
-BIGINT
-====

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/testdata/workloads/functional-query/queries/QueryTest/set.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/set.test b/testdata/workloads/functional-query/queries/QueryTest/set.test
index 5a2c56a..ffb53a1 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/set.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/set.test
@@ -8,7 +8,6 @@ set buffer_pool_limit=7;
 set all;
 ---- RESULTS: VERIFY_IS_SUBSET
 'ABORT_ON_ERROR','0','REGULAR'
-'ALLOW_UNSUPPORTED_FORMATS','0','DEPRECATED'
 'BATCH_SIZE','0','DEVELOPMENT'
 'BUFFER_POOL_LIMIT','','ADVANCED'
 'DEBUG_ACTION','','DEVELOPMENT'
@@ -34,7 +33,6 @@ set explain_level=3;
 set all;
 ---- RESULTS: VERIFY_IS_SUBSET
 'ABORT_ON_ERROR','0','REGULAR'
-'ALLOW_UNSUPPORTED_FORMATS','0','DEPRECATED'
 'BATCH_SIZE','0','DEVELOPMENT'
 'BUFFER_POOL_LIMIT','','ADVANCED'
 'DEBUG_ACTION','','DEVELOPMENT'
@@ -60,7 +58,6 @@ set explain_level='0';
 set all;
 ---- RESULTS: VERIFY_IS_SUBSET
 'ABORT_ON_ERROR','0','REGULAR'
-'ALLOW_UNSUPPORTED_FORMATS','0','DEPRECATED'
 'BATCH_SIZE','0','DEVELOPMENT'
 'BUFFER_POOL_LIMIT','','ADVANCED'
 'DEBUG_ACTION','','DEVELOPMENT'

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/testdata/workloads/functional-query/queries/QueryTest/text-writer.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/text-writer.test b/testdata/workloads/functional-query/queries/QueryTest/text-writer.test
deleted file mode 100644
index 89cd730..0000000
--- a/testdata/workloads/functional-query/queries/QueryTest/text-writer.test
+++ /dev/null
@@ -1,47 +0,0 @@
-====
----- QUERY
-drop table if exists __text_write;
-====
----- QUERY
-create table __text_write (i int, s string, d double);
-====
----- QUERY
-SET COMPRESSION_CODEC=NONE;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __text_write select 0, "a", 1.1;
-====
----- QUERY
-SET COMPRESSION_CODEC=DEFAULT;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __text_write values (1, "b", 2.2);
-====
----- QUERY
-SET COMPRESSION_CODEC=SNAPPY;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __text_write values (2, "c", 3.3);
-====
----- QUERY
-SET COMPRESSION_CODEC=GZIP;
-SET ALLOW_UNSUPPORTED_FORMATS=1;
-insert into __text_write values (3, "d", 4.4);
-====
----- QUERY
-SET COMPRESSION_CODEC=GZIP;
-SET ALLOW_UNSUPPORTED_FORMATS=0;
-insert into __text_write values (3, "d", 4.4);
----- CATCH
-Writing to compressed text table is not supported.
-====
----- QUERY
-select * from __text_write;
----- RESULTS
-0,'a',1.1
-1,'b',2.2
-2,'c',3.3
-3,'d',4.4
----- TYPES
-INT,STRING,DOUBLE
-====
----- QUERY
-drop table __text_write;
-====

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/testdata/workloads/functional-query/queries/QueryTest/unsupported-writers.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/unsupported-writers.test b/testdata/workloads/functional-query/queries/QueryTest/unsupported-writers.test
new file mode 100644
index 0000000..68f355f
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/unsupported-writers.test
@@ -0,0 +1,77 @@
+====
+---- QUERY
+create table __text_write (i int, s string, d double);
+====
+---- QUERY
+SET COMPRESSION_CODEC=NONE;
+insert into __text_write select 0, "a", 1.1;
+====
+---- QUERY
+SET COMPRESSION_CODEC=GZIP;
+insert into __text_write values (3, "d", 4.4);
+---- CATCH
+Writing to compressed text table is not supported.
+====
+---- QUERY
+select * from __text_write;
+---- RESULTS
+0,'a',1.1
+---- TYPES
+INT,STRING,DOUBLE
+====
+---- QUERY
+create table __avro_write (i int, s string, d double)
+stored as AVRO
+TBLPROPERTIES ('avro.schema.literal'='{
+                  "name": "my_record",
+                  "type": "record",
+                  "fields": [
+                  {"name":"i", "type":["int", "null"]},
+                  {"name":"s", "type":["string", "null"]},
+                  {"name":"d", "type":["double", "null"]}]}');
+====
+---- QUERY
+insert into __avro_write select 1, "b", 2.2;
+---- CATCH
+Writing to table format AVRO is not supported.
+====
+---- QUERY
+create table __seq_write (i int, s string, d double)
+stored as SEQUENCEFILE;
+====
+---- QUERY
+insert into __seq_write values (4, "e", 5.5);
+---- CATCH
+Writing to table format SEQUENCE_FILE is not supported.
+====
+---- QUERY
+# Test writing to mixed format table containing partitions in both supported and
+# unsupported formats where writing to the partition with supported format should succeed.
+# Create a table containing both text(supported) and avro(unsupported) partitions.
+create table __mixed_format_write (id int) partitioned by (part int);
+====
+---- QUERY
+insert into __mixed_format_write partition(part=2000) values(1);
+====
+---- QUERY
+insert into __mixed_format_write partition(part=2001) values(2);
+====
+---- QUERY
+alter table __mixed_format_write partition (part=2001) set fileformat AVRO;
+====
+---- QUERY
+insert into __mixed_format_write partition(part=2000) values(3);
+====
+---- QUERY
+insert into __mixed_format_write partition(part=2001) values(4);
+---- CATCH
+Writing to table format AVRO is not supported.
+====
+---- QUERY
+select id, part from __mixed_format_write where part = 2000;
+---- RESULTS
+1,2000
+3,2000
+---- TYPES
+INT,INT
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/tests/common/test_dimensions.py
----------------------------------------------------------------------
diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py
index 785cfa9..0460ea7 100644
--- a/tests/common/test_dimensions.py
+++ b/tests/common/test_dimensions.py
@@ -108,19 +108,6 @@ def create_parquet_dimension(workload):
   return ImpalaTestDimension('table_format',
       TableFormatInfo.create_from_string(dataset, 'parquet/none'))
 
-# Available Exec Options:
-#01: abort_on_error (bool)
-#02 max_errors (i32)
-#03: disable_codegen (bool)
-#04: batch_size (i32)
-#05: return_as_ascii (bool)
-#06: num_nodes (i32)
-#07: max_scan_range_length (i64)
-#08: num_scanner_threads (i32)
-#09: max_io_buffers (i32)
-#10: allow_unsupported_formats (bool)
-#11: partition_agg (bool)
-
 # Common sets of values for the exec option vectors
 ALL_BATCH_SIZES = [0]
 

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/tests/hs2/test_hs2.py
----------------------------------------------------------------------
diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py
index cd861e9..795f45c 100644
--- a/tests/hs2/test_hs2.py
+++ b/tests/hs2/test_hs2.py
@@ -89,11 +89,10 @@ class TestHS2(HS2TestSuite):
     # Should be unchanged
     assert vals2["SYNC_DDL"] == "0"
 
-    # Verify that 'DEVELOPMENT' and 'DEPRECATED' options are not returned.
     assert "MAX_ERRORS" in vals2
     assert levels["MAX_ERRORS"] == "ADVANCED"
+    # Verify that 'DEVELOPMENT' options are not returned.
     assert "DEBUG_ACTION" not in vals2
-    assert "ALLOW_UNSUPPORTED_FORMATS" not in vals2
 
     # Removed options should not be returned.
     assert "MAX_IO_BUFFERS" not in vals2
@@ -101,7 +100,8 @@ class TestHS2(HS2TestSuite):
   @needs_session()
   def test_session_option_levels_via_set_all(self):
     """
-    Tests the level of session options returned by a SET ALL query.
+    Tests the level of session options returned by a SET ALL query except DEPRECATED as we
+    currently do not have any of those left.
     """
     vals, levels = self.get_session_options("SET ALL")
 
@@ -109,12 +109,10 @@ class TestHS2(HS2TestSuite):
     assert "SYNC_DDL" in vals
     assert "MAX_ERRORS" in vals
     assert "DEBUG_ACTION" in vals
-    assert "ALLOW_UNSUPPORTED_FORMATS" in vals
     assert levels["COMPRESSION_CODEC"] == "REGULAR"
     assert levels["SYNC_DDL"] == "REGULAR"
     assert levels["MAX_ERRORS"] == "ADVANCED"
     assert levels["DEBUG_ACTION"] == "DEVELOPMENT"
-    assert levels["ALLOW_UNSUPPORTED_FORMATS"] == "DEPRECATED"
 
     # Removed options should not be returned.
     assert "MAX_IO_BUFFERS" not in vals

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/tests/metadata/test_partition_metadata.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_partition_metadata.py b/tests/metadata/test_partition_metadata.py
index 1d77aa5..d23e3f0 100644
--- a/tests/metadata/test_partition_metadata.py
+++ b/tests/metadata/test_partition_metadata.py
@@ -45,10 +45,7 @@ class TestPartitionMetadata(ImpalaTestSuite):
     # compression codecs.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
         (v.get_value('table_format').file_format in ('text', 'parquet') and
-         v.get_value('table_format').compression_codec == 'none') or
-        (v.get_value('table_format').file_format in ('seq', 'avro') and
-         v.get_value('table_format').compression_codec == 'snap' and
-         v.get_value('table_format').compression_type == 'block'))
+         v.get_value('table_format').compression_codec == 'none'))
 
   @SkipIfLocal.hdfs_client # TODO: this dependency might not exist anymore
   def test_multiple_partitions_same_location(self, vector, unique_database):
@@ -70,9 +67,6 @@ class TestPartitionMetadata(ImpalaTestSuite):
     self.client.execute("alter table %s add partition (j=2) location '%s/p'"
         % (FQ_TBL_NAME, TBL_LOCATION))
 
-    # Allow unsupported avro and sequence file writer.
-    self.client.execute("set allow_unsupported_formats=true")
-
     # Insert some data. This will only update partition j=1 (IMPALA-1480).
     self.client.execute("insert into table %s partition(j=1) select 1" % FQ_TBL_NAME)
     # Refresh to update file metadata of both partitions
@@ -80,31 +74,19 @@ class TestPartitionMetadata(ImpalaTestSuite):
 
     # The data will be read twice because each partition points to the same location.
     data = self.execute_scalar("select sum(i), sum(j) from %s" % FQ_TBL_NAME)
-    if file_format == 'avro':
-      # Avro writer is broken and produces nulls. Only check partition column.
-      assert data.split('\t')[1] == '3'
-    else:
-      assert data.split('\t') == ['2', '3']
+    assert data.split('\t') == ['2', '3']
 
     self.client.execute("insert into %s partition(j) select 1, 1" % FQ_TBL_NAME)
     self.client.execute("insert into %s partition(j) select 1, 2" % FQ_TBL_NAME)
     self.client.execute("refresh %s" % FQ_TBL_NAME)
     data = self.execute_scalar("select sum(i), sum(j) from %s" % FQ_TBL_NAME)
-    if file_format == 'avro':
-      # Avro writer is broken and produces nulls. Only check partition column.
-      assert data.split('\t')[1] == '9'
-    else:
-      assert data.split('\t') == ['6', '9']
+    assert data.split('\t') == ['6', '9']
 
     # Force all scan ranges to be on the same node. It should produce the same
     # result as above. See IMPALA-5412.
     self.client.execute("set num_nodes=1")
     data = self.execute_scalar("select sum(i), sum(j) from %s" % FQ_TBL_NAME)
-    if file_format == 'avro':
-      # Avro writer is broken and produces nulls. Only check partition column.
-      assert data.split('\t')[1] == '9'
-    else:
-      assert data.split('\t') == ['6', '9']
+    assert data.split('\t') == ['6', '9']
 
   @SkipIfS3.hive
   @SkipIfADLS.hive

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/tests/query_test/test_compressed_formats.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_compressed_formats.py b/tests/query_test/test_compressed_formats.py
index 694cfe9..2896632 100644
--- a/tests/query_test/test_compressed_formats.py
+++ b/tests/query_test/test_compressed_formats.py
@@ -129,71 +129,25 @@ class TestCompressedFormats(ImpalaTestSuite):
     finally:
       call(["hive", "-e", drop_cmd]);
 
-class TestTableWriters(ImpalaTestSuite):
+class TestUnsupportedTableWriters(ImpalaTestSuite):
   @classmethod
   def get_workload(cls):
     return 'functional-query'
 
   @classmethod
   def add_test_dimensions(cls):
-    super(TestTableWriters, cls).add_test_dimensions()
+    super(TestUnsupportedTableWriters, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
-    # This class tests many formats, but doesn't use the contraints
-    # Each format is tested within one test file, we constrain to text/none
-    # as each test file only needs to be run once.
+    # This class tests different formats, but doesn't use constraints.
+    # The constraint added below is only to make sure that the test file runs once.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
         (v.get_value('table_format').file_format =='text' and
         v.get_value('table_format').compression_codec == 'none'))
 
-  def test_seq_writer(self, vector, unique_database):
-    self.run_test_case('QueryTest/seq-writer', vector, unique_database)
-
-  @SkipIfS3.hive
-  @SkipIfADLS.hive
-  @SkipIfIsilon.hive
-  @SkipIfLocal.hive
-  def test_seq_writer_hive_compatibility(self, vector, unique_database):
-    self.client.execute('set ALLOW_UNSUPPORTED_FORMATS=1')
-    # Write sequence files with different compression codec/compression mode and then read
-    # it back in Impala and Hive.
-    # Note that we don't test snappy here as the snappy codec used by Impala does not seem
-    # to be fully compatible with the snappy codec used by Hive.
-    for comp_codec, comp_mode in [('NONE', 'RECORD'), ('NONE', 'BLOCK'),
-                                  ('DEFAULT', 'RECORD'), ('DEFAULT', 'BLOCK'),
-                                  ('GZIP', 'RECORD'), ('GZIP', 'BLOCK')]:
-      table_name = '%s.seq_tbl_%s_%s' % (unique_database, comp_codec, comp_mode)
-      self.client.execute('set COMPRESSION_CODEC=%s' % comp_codec)
-      self.client.execute('set SEQ_COMPRESSION_MODE=%s' % comp_mode)
-      self.client.execute('create table %s like functional.zipcode_incomes stored as '
-          'sequencefile' % table_name)
-      # Write sequence file of size greater than 4K
-      self.client.execute('insert into %s select * from functional.zipcode_incomes where '
-          'zip >= "5"' % table_name)
-      # Write sequence file of size less than 4K
-      self.client.execute('insert into %s select * from functional.zipcode_incomes where '
-          'zip="00601"' % table_name)
-
-      count_query = 'select count(*) from %s' % table_name
-
-      # Read it back in Impala
-      output = self.client.execute(count_query)
-      assert '16541' == output.get_data()
-      # Read it back in Hive
-      # Note that username is passed in for the sake of remote cluster tests. The default
-      # HDFS user is typically 'hdfs', and this is needed to run a count() operation using
-      # hive. For local mini clusters, the usename can be anything. See IMPALA-5413.
-      output = self.run_stmt_in_hive(count_query, username='hdfs')
-      assert '16541' == output.split('\n')[1]
-
-  def test_avro_writer(self, vector):
-    self.run_test_case('QueryTest/avro-writer', vector)
-
-  def test_text_writer(self, vector):
-    # TODO debug this test.
-    # This caused by a zlib failure. Suspected cause is too small a buffer
-    # passed to zlib for compression; similar to IMPALA-424
-    pytest.skip()
-    self.run_test_case('QueryTest/text-writer', vector)
+  def test_error_message(self, vector, unique_database):
+    # Tests that an appropriate error message is displayed for unsupported writers like
+    # compressed text, avro and sequence.
+    self.run_test_case('QueryTest/unsupported-writers', vector, unique_database)
 
 @pytest.mark.execute_serially
 class TestLargeCompressedFile(ImpalaTestSuite):

http://git-wip-us.apache.org/repos/asf/impala/blob/30e82c63/tests/shell/test_shell_interactive.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_interactive.py b/tests/shell/test_shell_interactive.py
index fe631cf..eac9d27 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -389,11 +389,10 @@ class TestImpalaShellInteractive(object):
     assert "APPX_COUNT_DISTINCT" in result.stdout
     assert "SUPPORT_START_OVER" in result.stdout
     # Development, deprecated and removed options should not be shown.
+    # Note: there are currently no deprecated options
     assert "Development Query Options:" not in result.stdout
-    assert "DEBUG_ACTION" not in result.stdout
-    assert "Deprecated Query Options:" not in result.stdout
-    assert "ALLOW_UNSUPPORTED_FORMATS" not in result.stdout
-    assert "MAX_IO_BUFFERS" not in result.stdout
+    assert "DEBUG_ACTION" not in result.stdout # Development option.
+    assert "MAX_IO_BUFFERS" not in result.stdout # Removed option.
 
     shell2 = ImpalaShell()
     shell2.send_cmd("set all")
@@ -401,7 +400,7 @@ class TestImpalaShellInteractive(object):
     assert "Query options (defaults shown in []):" in result.stdout
     assert "Advanced Query Options:" in result.stdout
     assert "Development Query Options:" in result.stdout
-    assert "Deprecated Query Options:" in result.stdout
+    assert "Deprecated Query Options:" not in result.stdout
     advanced_part_start_idx = result.stdout.find("Advanced Query Options")
     development_part_start_idx = result.stdout.find("Development Query Options")
     deprecated_part_start_idx = result.stdout.find("Deprecated Query Options")
@@ -411,7 +410,6 @@ class TestImpalaShellInteractive(object):
     assert "APPX_COUNT_DISTINCT" in advanced_part
     assert "SUPPORT_START_OVER" in advanced_part
     assert "DEBUG_ACTION" in development_part
-    assert "ALLOW_UNSUPPORTED_FORMATS" in result.stdout[deprecated_part_start_idx:]
     # Removed options should not be shown.
     assert "MAX_IO_BUFFERS" not in result.stdout