You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2017/04/26 23:22:17 UTC
[05/10] incubator-impala git commit: IMPALA-3079: Fix sequence file
writer
IMPALA-3079: Fix sequence file writer
This change fixes the following issues in the Sequence File Writer:
1. ReadWriteUtil::VLongRequiredBytes() and ReadWriteUtil::PutVLong()
were broken. As a result, Impala created corrupt uncompressed
sequence files.
2. KEY_CLASS_NAME was missing from the sequence file header. As a
result, Hive could not read back uncompressed sequence files
created by Impala.
3. Impala created record-compressed sequence files with empty keys
block. As a result, Hive could not read back record-compressed
sequence files created by Impala.
4. Impala created block-compressed files with:
- empty key-lengths block
- empty keys block
- empty value-lengths block
This resulted in invalid block-compressed sequence files that Hive could
not read back.
5. In some cases the wrong Record-compression flag was written to the
sequence file header. As a result, Hive could not read back record-
compressed sequence files created by Impala.
6. Impala added 'sync_marker' instead of 'neg1_sync_marker' to the
beginning of blocks in block-compressed sequence files. Hive could
not read these files back.
7. The calculation of block sizes in SnappyBlockCompressor class was
incorrect for odd-length buffers.
Change-Id: I0db642ad35132a9a5a6611810a6cafbbe26e7487
Reviewed-on: http://gerrit.cloudera.org:8080/6107
Reviewed-by: Michael Ho <kw...@cloudera.com>
Reviewed-by: Attila Jeges <at...@cloudera.com>
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/59b2db6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/59b2db6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/59b2db6b
Branch: refs/heads/master
Commit: 59b2db6ba722e5bef297bb4603519e06333ce5cb
Parents: edcc593
Author: Attila Jeges <at...@cloudera.com>
Authored: Mon Feb 20 18:07:25 2017 +0100
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Apr 25 21:07:53 2017 +0000
----------------------------------------------------------------------
be/src/exec/hdfs-sequence-table-writer.cc | 105 +++++++---
be/src/exec/hdfs-sequence-table-writer.h | 71 ++++++-
be/src/exec/read-write-util-test.cc | 57 ++++++
be/src/exec/read-write-util.h | 64 +++++--
be/src/util/compress.cc | 35 ++--
be/src/util/decompress-test.cc | 9 +-
.../queries/QueryTest/seq-writer.test | 192 ++++++++++++++++++-
tests/query_test/test_compressed_formats.py | 39 +++-
8 files changed, 495 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/exec/hdfs-sequence-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-table-writer.cc b/be/src/exec/hdfs-sequence-table-writer.cc
index 3c522ba..9af7e0f 100644
--- a/be/src/exec/hdfs-sequence-table-writer.cc
+++ b/be/src/exec/hdfs-sequence-table-writer.cc
@@ -38,8 +38,10 @@
namespace impala {
-uint8_t HdfsSequenceTableWriter::SEQ6_CODE[4] = {'S', 'E', 'Q', 6};
+const uint8_t HdfsSequenceTableWriter::SEQ6_CODE[4] = {'S', 'E', 'Q', 6};
const char* HdfsSequenceTableWriter::VALUE_CLASS_NAME = "org.apache.hadoop.io.Text";
+const char* HdfsSequenceTableWriter::KEY_CLASS_NAME =
+ "org.apache.hadoop.io.BytesWritable";
HdfsSequenceTableWriter::HdfsSequenceTableWriter(HdfsTableSink* parent,
RuntimeState* state, OutputPartition* output,
@@ -130,24 +132,25 @@ Status HdfsSequenceTableWriter::AppendRows(
}
Status HdfsSequenceTableWriter::WriteFileHeader() {
- out_.WriteBytes(sizeof(SEQ6_CODE), reinterpret_cast<uint8_t*>(SEQ6_CODE));
+ out_.WriteBytes(sizeof(SEQ6_CODE), SEQ6_CODE);
- // Output an empty KeyClassName field
- out_.WriteEmptyText();
+ // Setup to be correct key class
+ out_.WriteText(strlen(KEY_CLASS_NAME),
+ reinterpret_cast<const uint8_t*>(KEY_CLASS_NAME));
// Setup to be correct value class
out_.WriteText(strlen(VALUE_CLASS_NAME),
- reinterpret_cast<const uint8_t*>(VALUE_CLASS_NAME));
+ reinterpret_cast<const uint8_t*>(VALUE_CLASS_NAME));
// Flag for if compression is used
out_.WriteBoolean(compress_flag_);
// Only valid if compression is used. Indicates if block compression is used.
- out_.WriteBoolean(!record_compression_);
+ out_.WriteBoolean(compress_flag_ && !record_compression_);
// Output the name of our compression codec, parsed by readers
if (compress_flag_) {
out_.WriteText(codec_name_.size(),
- reinterpret_cast<const uint8_t*>(codec_name_.data()));
+ reinterpret_cast<const uint8_t*>(codec_name_.data()));
}
// Meta data is formated as an integer N followed by N*2 strings,
@@ -164,35 +167,63 @@ Status HdfsSequenceTableWriter::WriteFileHeader() {
}
Status HdfsSequenceTableWriter::WriteCompressedBlock() {
- WriteStream header;
+ WriteStream record;
+ uint8_t *output;
+ int64_t output_length;
DCHECK(compress_flag_);
- // add a sync marker to start of the block
- header.WriteBytes(sync_marker_.size(), sync_marker_.data());
+ // Add a sync marker to start of the block
+ record.WriteBytes(neg1_sync_marker_.size(), neg1_sync_marker_.data());
- header.WriteVLong(unflushed_rows_);
+ // Output the number of rows in this block
+ record.WriteVLong(unflushed_rows_);
- // Write Key Lengths and Key Values
- header.WriteEmptyText();
- header.WriteEmptyText();
+ // Output compressed key-lengths block-size & compressed key-lengths block.
+ // The key-lengths block contains byte value of 4 as a key length for each row (this is
+ // what Hive does).
+ string key_lengths_text(unflushed_rows_, '\x04');
+ {
+ SCOPED_TIMER(parent_->compress_timer());
+ RETURN_IF_ERROR(compressor_->ProcessBlock(false, key_lengths_text.size(),
+ reinterpret_cast<uint8_t*>(&key_lengths_text[0]), &output_length, &output));
+ }
+ record.WriteVInt(output_length);
+ record.WriteBytes(output_length, output);
- // Output an Empty string for value Lengths
- header.WriteEmptyText();
+ // Output compressed keys block-size & compressed keys block.
+ // The keys block contains "\0\0\0\0" byte sequence as a key for each row (this is what
+ // Hive does).
+ string keys_text(unflushed_rows_ * 4, '\0');
+ {
+ SCOPED_TIMER(parent_->compress_timer());
+ RETURN_IF_ERROR(compressor_->ProcessBlock(false, keys_text.size(),
+ reinterpret_cast<uint8_t*>(&keys_text[0]), &output_length, &output));
+ }
+ record.WriteVInt(output_length);
+ record.WriteBytes(output_length, output);
- uint8_t *output;
- int64_t output_length;
+ // Output compressed value-lengths block-size & compressed value-lengths block
+ string value_lengths_text = out_value_lengths_block_.String();
+ {
+ SCOPED_TIMER(parent_->compress_timer());
+ RETURN_IF_ERROR(compressor_->ProcessBlock(false, value_lengths_text.size(),
+ reinterpret_cast<uint8_t*>(&value_lengths_text[0]), &output_length, &output));
+ }
+ record.WriteVInt(output_length);
+ record.WriteBytes(output_length, output);
+
+ // Output compressed values block-size & compressed values block
string text = out_.String();
{
SCOPED_TIMER(parent_->compress_timer());
RETURN_IF_ERROR(compressor_->ProcessBlock(false, text.size(),
reinterpret_cast<uint8_t*>(&text[0]), &output_length, &output));
}
+ record.WriteVInt(output_length);
+ record.WriteBytes(output_length, output);
- header.WriteVInt(output_length);
- string head = header.String();
- RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(head.data()),
- head.size()));
- RETURN_IF_ERROR(Write(output, output_length));
+ string rec = record.String();
+ RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(rec.data()), rec.size()));
return Status::OK();
}
@@ -237,11 +268,15 @@ inline Status HdfsSequenceTableWriter::ConsumeRow(TupleRow* row) {
++unflushed_rows_;
row_buf_.Clear();
if (compress_flag_ && !record_compression_) {
- // Output row for a block compressed sequence file
- // write the length as a vlong and then write the contents
+ // Output row for a block compressed sequence file.
+ // Value block: Write the length as a vlong and then write the contents.
EncodeRow(row, &row_buf_);
out_.WriteVLong(row_buf_.Size());
out_.WriteBytes(row_buf_.Size(), row_buf_.String().data());
+ // Value-lengths block: Write the number of bytes we have just written to out_ as
+ // vlong
+ out_value_lengths_block_.WriteVLong(
+ ReadWriteUtil::VLongRequiredBytes(row_buf_.Size()) + row_buf_.Size());
return Status::OK();
}
@@ -249,7 +284,7 @@ inline Status HdfsSequenceTableWriter::ConsumeRow(TupleRow* row) {
const uint8_t* value_bytes;
int64_t value_length;
- if (record_compression_) {
+ if (compress_flag_) {
// apply compression to row_buf_
// the length of the buffer must be prefixed to the buffer prior to compression
//
@@ -275,16 +310,22 @@ inline Status HdfsSequenceTableWriter::ConsumeRow(TupleRow* row) {
int rec_len = value_length;
// if the record is compressed, the length is part of the compressed text
// if not, then we need to write the length (below) and account for it's size
- if (!record_compression_) rec_len += ReadWriteUtil::VLongRequiredBytes(value_length);
+ if (!compress_flag_) {
+ rec_len += ReadWriteUtil::VLongRequiredBytes(value_length);
+ }
+ // The record contains the key, account for it's size (we use "\0\0\0\0" byte sequence
+ // as a key just like Hive).
+ rec_len += 4;
- // Length of the record (incl. key length and value length)
+ // Length of the record (incl. key and value length)
out_.WriteInt(rec_len);
- // Write length of the key (Impala/Hive doesn't write a key)
- out_.WriteInt(0);
+ // Write length of the key and the key
+ out_.WriteInt(4);
+ out_.WriteBytes(4, "\0\0\0\0");
// if the record is compressed, the length is part of the compressed text
- if (!record_compression_) out_.WriteVLong(value_length);
+ if (!compress_flag_) out_.WriteVLong(value_length);
// write out the value (possibly compressed)
out_.WriteBytes(value_length, value_bytes);
@@ -304,6 +345,8 @@ Status HdfsSequenceTableWriter::Flush() {
Write(reinterpret_cast<const uint8_t*>(out_str.data()), out_str.size()));
}
out_.Clear();
+ out_value_lengths_block_.Clear();
+ mem_pool_->FreeAll();
unflushed_rows_ = 0;
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/exec/hdfs-sequence-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-table-writer.h b/be/src/exec/hdfs-sequence-table-writer.h
index 7f6a888..3d72926 100644
--- a/be/src/exec/hdfs-sequence-table-writer.h
+++ b/be/src/exec/hdfs-sequence-table-writer.h
@@ -36,6 +36,66 @@ class RuntimeState;
struct StringValue;
struct OutputPartition;
+/// Sequence files are flat files consisting of binary key/value pairs. Essentially there
+/// are 3 different formats for sequence files depending on the 'compression_codec' and
+/// 'seq_compression_mode' query options:
+/// - Uncompressed sequence file format
+/// - Record-compressed sequence file format
+/// - Block-compressed sequence file format
+/// All of them share a common header described below.
+///
+/// Sequence File Header
+/// --------------------
+/// - version - 3 bytes of magic header SEQ, followed by 1 byte of actual version number
+/// (e.g. SEQ4 or SEQ6)
+/// - keyClassName - key class
+/// - valueClassName - value class
+/// - compression - A boolean which specifies if compression is turned on for keys/values
+/// in this file.
+/// - blockCompression - A boolean which specifies if block-compression is turned on for
+/// keys/values in this file.
+/// - compression codec - compression codec class which is used for compression of keys
+/// and/or values (if compression is enabled).
+/// - metadata - SequenceFile.Metadata for this file.
+/// - sync - A 16 byte sync marker to denote end of the header.
+///
+/// Uncompressed Sequence File Format
+/// ---------------------------------
+/// - Header
+/// - Record
+/// - Record length
+/// - Key length
+/// - Key
+/// - Value
+/// - "\xFF\xFF\xFF\xFF" followed by a sync-marker every few 100 bytes or so.
+///
+/// Record-Compressed Sequence File Format
+/// --------------------------------------
+/// - Header
+/// - Record
+/// - Record length
+/// - Key length
+/// - Key
+/// - Compressed Value
+/// - "\xFF\xFF\xFF\xFF" followed by a sync-marker every few 100 bytes or so.
+///
+/// Block-Compressed Sequence File Format
+/// -------------------------------------
+/// - Header
+/// - Record Block
+/// - Uncompressed number of records in the block
+/// - Compressed key-lengths block-size
+/// - Compressed key-lengths block
+/// - Compressed keys block-size
+/// - Compressed keys block
+/// - Compressed value-lengths block-size
+/// - Compressed value-lengths block
+/// - Compressed values block-size
+/// - Compressed values block
+/// - "\xFF\xFF\xFF\xFF" followed by a sync-marker every block.
+/// The compressed blocks of key lengths and value lengths consist of the actual lengths
+/// of individual keys/values encoded in zero-compressed integer format.
+
/// Consumes rows and outputs the rows into a sequence file in HDFS
/// Output is buffered to fill sequence file blocks.
class HdfsSequenceTableWriter : public HdfsTableWriter {
@@ -67,7 +127,8 @@ class HdfsSequenceTableWriter : public HdfsTableWriter {
/// writes the SEQ file header to HDFS
Status WriteFileHeader();
- /// writes the contents of out_ as a single compressed block
+ /// writes the contents of out_value_lengths_block_ and out_ as a single
+ /// block-compressed record.
Status WriteCompressedBlock();
/// writes the tuple row to the given buffer; separates fields by field_delim_,
@@ -88,6 +149,10 @@ class HdfsSequenceTableWriter : public HdfsTableWriter {
/// buffer which holds accumulated output
WriteStream out_;
+ /// buffer which holds accumulated value-lengths output (used with block-compressed
+ /// sequence files)
+ WriteStream out_value_lengths_block_;
+
/// Temporary Buffer for a single row
WriteStream row_buf_;
@@ -119,10 +184,12 @@ class HdfsSequenceTableWriter : public HdfsTableWriter {
/// A -1 infront of the sync marker, used in decompressed formats
std::string neg1_sync_marker_;
+ /// Name of java class to use when reading the keys
+ static const char* KEY_CLASS_NAME;
/// Name of java class to use when reading the values
static const char* VALUE_CLASS_NAME;
/// Magic characters used to identify the file type
- static uint8_t SEQ6_CODE[4];
+ static const uint8_t SEQ6_CODE[4];
};
} // namespace impala
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/exec/read-write-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/read-write-util-test.cc b/be/src/exec/read-write-util-test.cc
index 0f9c6ae..e4448de 100644
--- a/be/src/exec/read-write-util-test.cc
+++ b/be/src/exec/read-write-util-test.cc
@@ -56,6 +56,63 @@ TEST(ReadWriteUtil, BigEndian) {
TestBigEndian<uint64_t>(0xffffffffffffff);
}
+TEST(ReadWriteUtil, ZeroCompressedLongRequiredBytes) {
+ // Small longs stored in 1 byte
+ for (int64_t val = -112; val <= 127; val++) {
+ EXPECT_EQ(1, ReadWriteUtil::VLongRequiredBytes(val));
+ }
+ // Small longs stored in 2 bytes
+ for (int64_t val = -128; val < -112; val++) {
+ EXPECT_EQ(2, ReadWriteUtil::VLongRequiredBytes(val));
+ }
+ // Positive longs stored in 3-9 bytes
+ int64_t val = 0x7000ab00cd00ef00;
+ for (int sh = 0; sh <= 6; sh++) {
+ EXPECT_EQ(9 - sh, ReadWriteUtil::VLongRequiredBytes(val));
+ val = val >> 8;
+ }
+ // Negative longs stored 3-9 bytes
+ val = 0x8000ab00cd00ef00;
+ for (int sh = 0; sh <= 6; sh++) {
+ EXPECT_EQ(9 - sh, ReadWriteUtil::VLongRequiredBytes(val));
+ val = val >> 8;
+ }
+ //Max/min long is stored in 9 bytes
+ EXPECT_EQ(9, ReadWriteUtil::VLongRequiredBytes(0x7fffffffffffffff));
+ EXPECT_EQ(9, ReadWriteUtil::VLongRequiredBytes(0x8000000000000000));
+}
+
+void TestPutGetZeroCompressedLong(int64_t val) {
+ uint8_t buffer[9];
+ int64_t read_val;
+ int64_t num_bytes = ReadWriteUtil::PutVLong(val, buffer);
+ int64_t read_bytes = ReadWriteUtil::GetVLong(buffer, &read_val);
+ EXPECT_EQ(read_bytes, num_bytes);
+ EXPECT_EQ(read_val, val);
+}
+
+TEST(ReadWriteUtil, ZeroCompressedLong) {
+ //1 byte longs
+ for (int64_t val = -128; val <= 127; val++) {
+ TestPutGetZeroCompressedLong(val);
+ }
+ //2+ byte positive longs
+ int64_t val = 0x70100000200000ab;
+ for (int sh = 0; sh <= 6; sh++) {
+ TestPutGetZeroCompressedLong(val);
+ val = val >> 8;
+ }
+ //2+ byte negative longs
+ val = 0x80100000200000ab;
+ for (int sh = 0; sh <= 6; sh++) {
+ TestPutGetZeroCompressedLong(val);
+ val = val >> 8;
+ }
+ //Max/min long
+ TestPutGetZeroCompressedLong(0x7fffffffffffffff);
+ TestPutGetZeroCompressedLong(0x8000000000000000);
+}
+
}
IMPALA_TEST_MAIN();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/exec/read-write-util.h
----------------------------------------------------------------------
diff --git a/be/src/exec/read-write-util.h b/be/src/exec/read-write-util.h
index c384839..84d41dd 100644
--- a/be/src/exec/read-write-util.h
+++ b/be/src/exec/read-write-util.h
@@ -60,11 +60,11 @@ class ReadWriteUtil {
static int GetVInt(uint8_t* buf, int32_t* vint);
/// Writes a variable-length Long or int value to a byte buffer.
- /// Returns the number of bytes written
+ /// Returns the number of bytes written.
static int64_t PutVLong(int64_t val, uint8_t* buf);
static int64_t PutVInt(int32_t val, uint8_t* buf);
- /// returns size of the encoded long value, not including the 1 byte for length
+ /// Returns size of the encoded long value, including the 1 byte for length.
static int VLongRequiredBytes(int64_t val);
/// Read a variable-length Long value from a byte buffer starting at the specified
@@ -211,41 +211,63 @@ inline int ReadWriteUtil::GetVLong(uint8_t* buf, int64_t offset, int64_t* vlong)
return len;
}
+// Returns size of the encoded long value, including the 1 byte for length for val < -112
+// or val > 127.
inline int ReadWriteUtil::VLongRequiredBytes(int64_t val) {
- // returns size of the encoded long value, not including the 1 byte for length
- if (val & 0xFF00000000000000llu) return 8;
- if (val & 0x00FF000000000000llu) return 7;
- if (val & 0x0000FF0000000000llu) return 6;
- if (val & 0x000000FF00000000llu) return 5;
- if (val & 0x00000000FF000000llu) return 4;
- if (val & 0x0000000000FF0000llu) return 3;
- if (val & 0x000000000000FF00llu) return 2;
- // Values between -112 and 127 are stored using 1 byte,
- // values between -127 and -112 are stored using 2 bytes
- // See ReadWriteUtil::DecodeVIntSize for this case
- if (val < -112) return 2;
- return 1;
+ if (val >= -112 && val <= 127) return 1;
+ // If 'val' is negtive, take the one's complement.
+ if (val < 0) val = ~val;
+ return 9 - __builtin_clzll(val)/8;
}
+// Serializes 'val' to a binary stream with zero-compressed encoding. For -112<=val<=127,
+// only one byte is used with the actual value. For other values of 'val', the first byte
+// value indicates whether the long is positive or negative, and the number of bytes that
+// follow. If the first byte value v is between -113 and -120, the following long is
+// positive, with number of bytes that follow are -(v+112). If the first byte value v is
+// between -121 and -128, the following long is negative, with number of bytes that follow
+// are -(v+120). Bytes are stored in the high-non-zero-byte-first order. Returns the
+// number of bytes written.
+// For more information, see the documentation for 'WritableUtils.writeVLong()' method:
+// https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/io/WritableUtils.html
inline int64_t ReadWriteUtil::PutVLong(int64_t val, uint8_t* buf) {
int64_t num_bytes = VLongRequiredBytes(val);
if (num_bytes == 1) {
+ DCHECK(val >= -112 && val <= 127);
// store the value itself instead of the length
buf[0] = static_cast<int8_t>(val);
return 1;
}
// This is how we encode the length for a length less than or equal to 8
- buf[0] = -119 + num_bytes;
+ DCHECK_GE(num_bytes, 2);
+ DCHECK_LE(num_bytes, 9);
+ if (val < 0) {
+ DCHECK_LT(val, -112);
+ // The first byte in 'buf' should contain a value between -121 and -128 that makes the
+ // following condition true: -(buf[0] + 120) == num_bytes - 1.
+ // Note that 'num_bytes' includes the 1 extra byte for length.
+ buf[0] = -(num_bytes + 119);
+ // If 'val' is negtive, take the one's complement.
+ // See the source code for WritableUtils.writeVLong() method:
+ // https://hadoop.apache.org/docs/r2.7.2/api/src-html/org/apache/hadoop/io/
+ // WritableUtils.html#line.271
+ val = ~val;
+ } else {
+ DCHECK_GT(val, 127);
+ // The first byte in 'buf' should contain a value between -113 and -120 that makes the
+ // following condition true: -(buf[0] + 112) == num_bytes - 1.
+ // Note that 'num_bytes' includes the 1 extra byte for length.
+ buf[0] = -(num_bytes + 111);
+ }
- // write to buffer in reversed endianness
- for (int i = 0; i < num_bytes; ++i) {
- buf[i+1] = (val >> (8 * (num_bytes - i - 1))) & 0xFF;
+ // write to the buffer in Big Endianness
+ for (int i = 1; i < num_bytes; ++i) {
+ buf[i] = (val >> (8 * (num_bytes - i - 1))) & 0xFF;
}
- // +1 for the length byte
- return num_bytes + 1;
+ return num_bytes;
}
inline int64_t ReadWriteUtil::PutVInt(int32_t val, uint8_t* buf) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/util/compress.cc
----------------------------------------------------------------------
diff --git a/be/src/util/compress.cc b/be/src/util/compress.cc
index 429ae66..efa39bf 100644
--- a/be/src/util/compress.cc
+++ b/be/src/util/compress.cc
@@ -204,13 +204,27 @@ Status SnappyBlockCompressor::ProcessBlock(bool output_preallocated,
int64_t input_length, const uint8_t* input, int64_t *output_length,
uint8_t** output) {
DCHECK_GE(input_length, 0);
- // Hadoop uses a block compression scheme on top of snappy. First there is
- // an integer which is the size of the decompressed data followed by a
- // sequence of compressed blocks each preceded with an integer size.
- // For testing purposes we are going to generate two blocks.
- int64_t block_size = input_length / 2;
- size_t length = snappy::MaxCompressedLength(block_size) * 2;
- length += 3 * sizeof (int32_t);
+ // Hadoop uses a block compression scheme on top of snappy. The layout is as follows:
+ // - size of the entire decompressed data (4 bytes)
+ // - size of the 1st compressed block (4 bytes)
+ // - 1st compressed block
+ // - size of the 2nd compressed block (4 bytes)
+ // - 2nd compressed block
+ // ...
+ // For testing purposes we are going to generate two blocks if input_length >= 4K.
+ vector<int64_t> block_sizes;
+ size_t length;
+ if (input_length == 0) {
+ length = sizeof (int32_t);
+ } else if (input_length < 4 * 1024) {
+ block_sizes.push_back(input_length);
+ length = snappy::MaxCompressedLength(block_sizes[0]) + 2 * sizeof (int32_t);
+ } else {
+ block_sizes.push_back(input_length / 2);
+ block_sizes.push_back(input_length - block_sizes[0]);
+ length = snappy::MaxCompressedLength(block_sizes[0]) +
+ snappy::MaxCompressedLength(block_sizes[1]) + 3 * sizeof (int32_t);
+ }
DCHECK(!output_preallocated || length <= *output_length);
if (output_preallocated) {
@@ -222,13 +236,12 @@ Status SnappyBlockCompressor::ProcessBlock(bool output_preallocated,
}
uint8_t* outp = out_buffer_;
- uint8_t* sizep;
ReadWriteUtil::PutInt(outp, static_cast<uint32_t>(input_length));
outp += sizeof (int32_t);
- while (input_length > 0) {
+ for (int64_t block_size: block_sizes) {
// TODO: should this be a while or a do-while loop? Check what Hadoop does.
// Point at the spot to store the compressed size.
- sizep = outp;
+ uint8_t* sizep = outp;
outp += sizeof (int32_t);
size_t size;
snappy::RawCompress(reinterpret_cast<const char*>(input),
@@ -236,8 +249,8 @@ Status SnappyBlockCompressor::ProcessBlock(bool output_preallocated,
ReadWriteUtil::PutInt(sizep, static_cast<uint32_t>(size));
input += block_size;
- input_length -= block_size;
outp += size;
+ DCHECK_LE(outp - out_buffer_, length);
}
*output = out_buffer_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/util/decompress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc
index 7709bb1..1f84bad 100644
--- a/be/src/util/decompress-test.cc
+++ b/be/src/util/decompress-test.cc
@@ -73,8 +73,15 @@ class DecompressorTest : public ::testing::Test {
DecompressInsufficientOutputBuffer(compressor.get(), decompressor.get(),
sizeof(input_), input_);
} else {
- CompressAndDecompress(compressor.get(), decompressor.get(), sizeof(input_),
+ CompressAndDecompress(compressor.get(), decompressor.get(), sizeof(input_), input_);
+ // Test with odd-length input (to test the calculation of block-sizes in
+ // SnappyBlockCompressor)
+ CompressAndDecompress(compressor.get(), decompressor.get(), sizeof(input_) - 1,
input_);
+ // Test with input length of 1024 (to test SnappyBlockCompressor with a single
+ // block)
+ CompressAndDecompress(compressor.get(), decompressor.get(), 1024, input_);
+ // Test with empty input
if (format != THdfsCompression::BZIP2) {
CompressAndDecompress(compressor.get(), decompressor.get(), 0, NULL);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test b/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test
index fe4d829..528b83e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test
@@ -1,8 +1,5 @@
====
---- QUERY
-drop table if exists __seq_write;
-====
----- QUERY
SET COMPRESSION_CODEC=NONE;
SET ALLOW_UNSUPPORTED_FORMATS=1;
SET SEQ_COMPRESSION_MODE=BLOCK;
@@ -92,5 +89,192 @@ select * from __seq_write;
INT,STRING,DOUBLE
====
---- QUERY
-drop table __seq_write;
+# IMPALA-3079: Create a table containing larger seq files with NONE+RECORD and then read
+# it back
+SET COMPRESSION_CODEC=NONE;
+SET SEQ_COMPRESSION_MODE=RECORD;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_none_rec like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_none_rec partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_none_rec;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with DEFAULT+RECORD and then
+# read it back
+SET COMPRESSION_CODEC=DEFAULT;
+SET SEQ_COMPRESSION_MODE=RECORD;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_def_rec like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_def_rec partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_def_rec;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with SNAPPY_BLOCKED+RECORD and
+# then read it back
+SET COMPRESSION_CODEC=SNAPPY_BLOCKED;
+SET SEQ_COMPRESSION_MODE=RECORD;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_snapb_rec like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_snapb_rec partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_snapb_rec;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with SNAPPY+RECORD and then read
+# it back
+SET COMPRESSION_CODEC=SNAPPY;
+SET SEQ_COMPRESSION_MODE=RECORD;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_snap_rec like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_snap_rec partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_snap_rec;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with GZIP+RECORD and then read
+# it back
+SET COMPRESSION_CODEC=GZIP;
+SET SEQ_COMPRESSION_MODE=RECORD;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_gzip_rec like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_gzip_rec partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_gzip_rec;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with NONE+BLOCK and then read it
+# back
+SET COMPRESSION_CODEC=NONE;
+SET SEQ_COMPRESSION_MODE=BLOCK;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_none_block like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_none_block partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_none_block;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with DEFAULT+BLOCK and then read
+# it back
+SET COMPRESSION_CODEC=DEFAULT;
+SET SEQ_COMPRESSION_MODE=BLOCK;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_def_block like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_def_block partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_def_block;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with SNAPPY_BLOCKED+BLOCK and
+# then read it back
+SET COMPRESSION_CODEC=SNAPPY_BLOCKED;
+SET SEQ_COMPRESSION_MODE=BLOCK;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_snapb_block like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_snapb_block partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_snapb_block;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with SNAPPY+BLOCK and then read
+# it back
+SET COMPRESSION_CODEC=SNAPPY;
+SET SEQ_COMPRESSION_MODE=BLOCK;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_snap_block like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_snap_block partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_snap_block;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with GZIP+BLOCK and then read it
+# back
+SET COMPRESSION_CODEC=GZIP;
+SET SEQ_COMPRESSION_MODE=BLOCK;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_gzip_block like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_gzip_block partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_gzip_block;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
====
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/tests/query_test/test_compressed_formats.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_compressed_formats.py b/tests/query_test/test_compressed_formats.py
index 654ea48..36dc427 100644
--- a/tests/query_test/test_compressed_formats.py
+++ b/tests/query_test/test_compressed_formats.py
@@ -144,18 +144,43 @@ class TestTableWriters(ImpalaTestSuite):
(v.get_value('table_format').file_format =='text' and
v.get_value('table_format').compression_codec == 'none'))
- def test_seq_writer(self, vector):
- # TODO debug this test, same as seq writer.
- # This caused by a zlib failure. Suspected cause is too small a buffer
- # passed to zlib for compression; similar to IMPALA-424
- pytest.skip()
- self.run_test_case('QueryTest/seq-writer', vector)
+ def test_seq_writer(self, vector, unique_database):
+ self.run_test_case('QueryTest/seq-writer', vector, unique_database)
+
+ def test_seq_writer_hive_compatibility(self, vector, unique_database):
+ self.client.execute('set ALLOW_UNSUPPORTED_FORMATS=1')
+ # Write sequence files with different compression codec/compression mode and then read
+ # it back in Impala and Hive.
+ # Note that we don't test snappy here as the snappy codec used by Impala does not seem
+ # to be fully compatible with the snappy codec used by Hive.
+ for comp_codec, comp_mode in [('NONE', 'RECORD'), ('NONE', 'BLOCK'),
+ ('DEFAULT', 'RECORD'), ('DEFAULT', 'BLOCK'),
+ ('GZIP', 'RECORD'), ('GZIP', 'BLOCK')]:
+ table_name = '%s.seq_tbl_%s_%s' % (unique_database, comp_codec, comp_mode)
+ self.client.execute('set COMPRESSION_CODEC=%s' % comp_codec)
+ self.client.execute('set SEQ_COMPRESSION_MODE=%s' % comp_mode)
+ self.client.execute('create table %s like functional.zipcode_incomes stored as '
+ 'sequencefile' % table_name)
+ # Write sequence file of size greater than 4K
+ self.client.execute('insert into %s select * from functional.zipcode_incomes where '
+ 'zip >= "5"' % table_name)
+ # Write sequence file of size less than 4K
+ self.client.execute('insert into %s select * from functional.zipcode_incomes where '
+ 'zip="00601"' % table_name)
+ # Read it back in Impala
+ output = self.client.execute('select count(*) from %s' % table_name)
+ assert '16541' == output.get_data()
+ # Read it back in Hive
+ output = self.run_stmt_in_hive('select count(*) from %s' % table_name)
+ assert '16541' == output.split('\n')[1]
def test_avro_writer(self, vector):
self.run_test_case('QueryTest/avro-writer', vector)
def test_text_writer(self, vector):
- # TODO debug this test, same as seq writer.
+ # TODO debug this test.
+ # This caused by a zlib failure. Suspected cause is too small a buffer
+ # passed to zlib for compression; similar to IMPALA-424
pytest.skip()
self.run_test_case('QueryTest/text-writer', vector)