You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2017/04/26 23:22:17 UTC

[05/10] incubator-impala git commit: IMPALA-3079: Fix sequence file writer

IMPALA-3079: Fix sequence file writer

This change fixes the following issues in the Sequence File Writer:
1. ReadWriteUtil::VLongRequiredBytes() and ReadWriteUtil::PutVLong()
   were broken. As a result, Impala created corrupt uncompressed
   sequence files.

2. KEY_CLASS_NAME was missing from the sequence file header. As a
   result, Hive could not read back uncompressed sequence files
   created by Impala.

3. Impala created record-compressed sequence files with empty keys
   block. As a result, Hive could not read back record-compressed
   sequence files created by Impala.

4. Impala created block-compressed files with:
   - empty key-lengths block
   - empty keys block
   - empty value-lengths block
   This resulted in invalid block-compressed sequence files that Hive could
   not read back.

5. In some cases the wrong Record-compression flag was written to the
   sequence file header. As a result, Hive could not read back record-
   compressed sequence files created by Impala.

6. Impala added 'sync_marker' instead of 'neg1_sync_marker' to the
   beginning of blocks in block-compressed sequence files. Hive could
   not read these files back.

7. The calculation of block sizes in SnappyBlockCompressor class was
   incorrect for odd-length buffers.

Change-Id: I0db642ad35132a9a5a6611810a6cafbbe26e7487
Reviewed-on: http://gerrit.cloudera.org:8080/6107
Reviewed-by: Michael Ho <kw...@cloudera.com>
Reviewed-by: Attila Jeges <at...@cloudera.com>
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/59b2db6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/59b2db6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/59b2db6b

Branch: refs/heads/master
Commit: 59b2db6ba722e5bef297bb4603519e06333ce5cb
Parents: edcc593
Author: Attila Jeges <at...@cloudera.com>
Authored: Mon Feb 20 18:07:25 2017 +0100
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Apr 25 21:07:53 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-sequence-table-writer.cc       | 105 +++++++---
 be/src/exec/hdfs-sequence-table-writer.h        |  71 ++++++-
 be/src/exec/read-write-util-test.cc             |  57 ++++++
 be/src/exec/read-write-util.h                   |  64 +++++--
 be/src/util/compress.cc                         |  35 ++--
 be/src/util/decompress-test.cc                  |   9 +-
 .../queries/QueryTest/seq-writer.test           | 192 ++++++++++++++++++-
 tests/query_test/test_compressed_formats.py     |  39 +++-
 8 files changed, 495 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/exec/hdfs-sequence-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-table-writer.cc b/be/src/exec/hdfs-sequence-table-writer.cc
index 3c522ba..9af7e0f 100644
--- a/be/src/exec/hdfs-sequence-table-writer.cc
+++ b/be/src/exec/hdfs-sequence-table-writer.cc
@@ -38,8 +38,10 @@
 
 namespace impala {
 
-uint8_t HdfsSequenceTableWriter::SEQ6_CODE[4] = {'S', 'E', 'Q', 6};
+const uint8_t HdfsSequenceTableWriter::SEQ6_CODE[4] = {'S', 'E', 'Q', 6};
 const char* HdfsSequenceTableWriter::VALUE_CLASS_NAME = "org.apache.hadoop.io.Text";
+const char* HdfsSequenceTableWriter::KEY_CLASS_NAME =
+    "org.apache.hadoop.io.BytesWritable";
 
 HdfsSequenceTableWriter::HdfsSequenceTableWriter(HdfsTableSink* parent,
                         RuntimeState* state, OutputPartition* output,
@@ -130,24 +132,25 @@ Status HdfsSequenceTableWriter::AppendRows(
 }
 
 Status HdfsSequenceTableWriter::WriteFileHeader() {
-  out_.WriteBytes(sizeof(SEQ6_CODE), reinterpret_cast<uint8_t*>(SEQ6_CODE));
+  out_.WriteBytes(sizeof(SEQ6_CODE), SEQ6_CODE);
 
-  // Output an empty KeyClassName field
-  out_.WriteEmptyText();
+  // Setup to be correct key class
+  out_.WriteText(strlen(KEY_CLASS_NAME),
+      reinterpret_cast<const uint8_t*>(KEY_CLASS_NAME));
 
   // Setup to be correct value class
   out_.WriteText(strlen(VALUE_CLASS_NAME),
-                 reinterpret_cast<const uint8_t*>(VALUE_CLASS_NAME));
+      reinterpret_cast<const uint8_t*>(VALUE_CLASS_NAME));
 
   // Flag for if compression is used
   out_.WriteBoolean(compress_flag_);
   // Only valid if compression is used. Indicates if block compression is used.
-  out_.WriteBoolean(!record_compression_);
+  out_.WriteBoolean(compress_flag_ && !record_compression_);
 
   // Output the name of our compression codec, parsed by readers
   if (compress_flag_) {
     out_.WriteText(codec_name_.size(),
-                   reinterpret_cast<const uint8_t*>(codec_name_.data()));
+        reinterpret_cast<const uint8_t*>(codec_name_.data()));
   }
 
   // Meta data is formated as an integer N followed by N*2 strings,
@@ -164,35 +167,63 @@ Status HdfsSequenceTableWriter::WriteFileHeader() {
 }
 
 Status HdfsSequenceTableWriter::WriteCompressedBlock() {
-  WriteStream header;
+  WriteStream record;
+  uint8_t *output;
+  int64_t output_length;
   DCHECK(compress_flag_);
 
-  // add a sync marker to start of the block
-  header.WriteBytes(sync_marker_.size(), sync_marker_.data());
+  // Add a sync marker to start of the block
+  record.WriteBytes(neg1_sync_marker_.size(), neg1_sync_marker_.data());
 
-  header.WriteVLong(unflushed_rows_);
+  // Output the number of rows in this block
+  record.WriteVLong(unflushed_rows_);
 
-  // Write Key Lengths and Key Values
-  header.WriteEmptyText();
-  header.WriteEmptyText();
+  // Output compressed key-lengths block-size & compressed key-lengths block.
+  // The key-lengths block contains byte value of 4 as a key length for each row (this is
+  // what Hive does).
+  string key_lengths_text(unflushed_rows_, '\x04');
+  {
+    SCOPED_TIMER(parent_->compress_timer());
+    RETURN_IF_ERROR(compressor_->ProcessBlock(false, key_lengths_text.size(),
+        reinterpret_cast<uint8_t*>(&key_lengths_text[0]), &output_length, &output));
+  }
+  record.WriteVInt(output_length);
+  record.WriteBytes(output_length, output);
 
-  // Output an Empty string for value Lengths
-  header.WriteEmptyText();
+  // Output compressed keys block-size & compressed keys block.
+  // The keys block contains "\0\0\0\0" byte sequence as a key for each row (this is what
+  // Hive does).
+  string keys_text(unflushed_rows_ * 4, '\0');
+  {
+    SCOPED_TIMER(parent_->compress_timer());
+    RETURN_IF_ERROR(compressor_->ProcessBlock(false, keys_text.size(),
+        reinterpret_cast<uint8_t*>(&keys_text[0]), &output_length, &output));
+  }
+  record.WriteVInt(output_length);
+  record.WriteBytes(output_length, output);
 
-  uint8_t *output;
-  int64_t output_length;
+  // Output compressed value-lengths block-size & compressed value-lengths block
+  string value_lengths_text = out_value_lengths_block_.String();
+  {
+    SCOPED_TIMER(parent_->compress_timer());
+    RETURN_IF_ERROR(compressor_->ProcessBlock(false, value_lengths_text.size(),
+        reinterpret_cast<uint8_t*>(&value_lengths_text[0]), &output_length, &output));
+  }
+  record.WriteVInt(output_length);
+  record.WriteBytes(output_length, output);
+
+  // Output compressed values block-size & compressed values block
   string text = out_.String();
   {
     SCOPED_TIMER(parent_->compress_timer());
     RETURN_IF_ERROR(compressor_->ProcessBlock(false, text.size(),
         reinterpret_cast<uint8_t*>(&text[0]), &output_length, &output));
   }
+  record.WriteVInt(output_length);
+  record.WriteBytes(output_length, output);
 
-  header.WriteVInt(output_length);
-  string head = header.String();
-  RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(head.data()),
-                        head.size()));
-  RETURN_IF_ERROR(Write(output, output_length));
+  string rec = record.String();
+  RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(rec.data()), rec.size()));
   return Status::OK();
 }
 
@@ -237,11 +268,15 @@ inline Status HdfsSequenceTableWriter::ConsumeRow(TupleRow* row) {
   ++unflushed_rows_;
   row_buf_.Clear();
   if (compress_flag_ && !record_compression_) {
-    // Output row for a block compressed sequence file
-    // write the length as a vlong and then write the contents
+    // Output row for a block compressed sequence file.
+    // Value block: Write the length as a vlong and then write the contents.
     EncodeRow(row, &row_buf_);
     out_.WriteVLong(row_buf_.Size());
     out_.WriteBytes(row_buf_.Size(), row_buf_.String().data());
+    // Value-lengths block: Write the number of bytes we have just written to out_ as
+    // vlong
+    out_value_lengths_block_.WriteVLong(
+        ReadWriteUtil::VLongRequiredBytes(row_buf_.Size()) + row_buf_.Size());
     return Status::OK();
   }
 
@@ -249,7 +284,7 @@ inline Status HdfsSequenceTableWriter::ConsumeRow(TupleRow* row) {
 
   const uint8_t* value_bytes;
   int64_t value_length;
-  if (record_compression_) {
+  if (compress_flag_) {
     // apply compression to row_buf_
     // the length of the buffer must be prefixed to the buffer prior to compression
     //
@@ -275,16 +310,22 @@ inline Status HdfsSequenceTableWriter::ConsumeRow(TupleRow* row) {
   int rec_len = value_length;
   // if the record is compressed, the length is part of the compressed text
   // if not, then we need to write the length (below) and account for it's size
-  if (!record_compression_) rec_len += ReadWriteUtil::VLongRequiredBytes(value_length);
+  if (!compress_flag_) {
+    rec_len += ReadWriteUtil::VLongRequiredBytes(value_length);
+  }
+  // The record contains the key, account for it's size (we use "\0\0\0\0" byte sequence
+  // as a key just like Hive).
+  rec_len += 4;
 
-  // Length of the record (incl. key length and value length)
+  // Length of the record (incl. key and value length)
   out_.WriteInt(rec_len);
 
-  // Write length of the key (Impala/Hive doesn't write a key)
-  out_.WriteInt(0);
+  // Write length of the key and the key
+  out_.WriteInt(4);
+  out_.WriteBytes(4, "\0\0\0\0");
 
   // if the record is compressed, the length is part of the compressed text
-  if (!record_compression_) out_.WriteVLong(value_length);
+  if (!compress_flag_) out_.WriteVLong(value_length);
 
   // write out the value (possibly compressed)
   out_.WriteBytes(value_length, value_bytes);
@@ -304,6 +345,8 @@ Status HdfsSequenceTableWriter::Flush() {
         Write(reinterpret_cast<const uint8_t*>(out_str.data()), out_str.size()));
   }
   out_.Clear();
+  out_value_lengths_block_.Clear();
+  mem_pool_->FreeAll();
   unflushed_rows_ = 0;
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/exec/hdfs-sequence-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-table-writer.h b/be/src/exec/hdfs-sequence-table-writer.h
index 7f6a888..3d72926 100644
--- a/be/src/exec/hdfs-sequence-table-writer.h
+++ b/be/src/exec/hdfs-sequence-table-writer.h
@@ -36,6 +36,66 @@ class RuntimeState;
 struct StringValue;
 struct OutputPartition;
 
+/// Sequence files are flat files consisting of binary key/value pairs. Essentially there
+/// are 3 different formats for sequence files depending on the 'compression_codec' and
+/// 'seq_compression_mode' query options:
+/// - Uncompressed sequence file format
+/// - Record-compressed sequence file format
+/// - Block-compressed sequence file format
+/// All of them share a common header described below.
+///
+/// Sequence File Header
+/// --------------------
+/// - version - 3 bytes of magic header SEQ, followed by 1 byte of actual version number
+///   (e.g. SEQ4 or SEQ6)
+/// - keyClassName - key class
+/// - valueClassName - value class
+/// - compression - A boolean which specifies if compression is turned on for keys/values
+///   in this file.
+/// - blockCompression - A boolean which specifies if block-compression is turned on for
+///   keys/values in this file.
+/// - compression codec - compression codec class which is used for compression of keys
+///   and/or values (if compression is enabled).
+/// - metadata - SequenceFile.Metadata for this file.
+/// - sync - A 16 byte sync marker to denote end of the header.
+///
+/// Uncompressed Sequence File Format
+/// ---------------------------------
+/// - Header
+/// - Record
+///   - Record length
+///   - Key length
+///   - Key
+///   - Value
+/// - "\xFF\xFF\xFF\xFF" followed by a sync-marker every few 100 bytes or so.
+///
+/// Record-Compressed Sequence File Format
+/// --------------------------------------
+/// - Header
+/// - Record
+///   - Record length
+///   - Key length
+///   - Key
+///   - Compressed Value
+/// - "\xFF\xFF\xFF\xFF" followed by a sync-marker every few 100 bytes or so.
+///
+/// Block-Compressed Sequence File Format
+/// -------------------------------------
+/// - Header
+/// - Record Block
+///   - Uncompressed number of records in the block
+///   - Compressed key-lengths block-size
+///   - Compressed key-lengths block
+///   - Compressed keys block-size
+///   - Compressed keys block
+///   - Compressed value-lengths block-size
+///   - Compressed value-lengths block
+///   - Compressed values block-size
+///   - Compressed values block
+/// - "\xFF\xFF\xFF\xFF" followed by a sync-marker every block.
+/// The compressed blocks of key lengths and value lengths consist of the actual lengths
+/// of individual keys/values encoded in zero-compressed integer format.
+
 /// Consumes rows and outputs the rows into a sequence file in HDFS
 /// Output is buffered to fill sequence file blocks.
 class HdfsSequenceTableWriter : public HdfsTableWriter {
@@ -67,7 +127,8 @@ class HdfsSequenceTableWriter : public HdfsTableWriter {
   /// writes the SEQ file header to HDFS
   Status WriteFileHeader();
 
-  /// writes the contents of out_ as a single compressed block
+  /// writes the contents of out_value_lengths_block_ and out_ as a single
+  /// block-compressed record.
   Status WriteCompressedBlock();
 
   /// writes the tuple row to the given buffer; separates fields by field_delim_,
@@ -88,6 +149,10 @@ class HdfsSequenceTableWriter : public HdfsTableWriter {
   /// buffer which holds accumulated output
   WriteStream out_;
 
+  /// buffer which holds accumulated value-lengths output (used with block-compressed
+  /// sequence files)
+  WriteStream out_value_lengths_block_;
+
   /// Temporary Buffer for a single row
   WriteStream row_buf_;
 
@@ -119,10 +184,12 @@ class HdfsSequenceTableWriter : public HdfsTableWriter {
   /// A -1 infront of the sync marker, used in decompressed formats
   std::string neg1_sync_marker_;
 
+  /// Name of java class to use when reading the keys
+  static const char* KEY_CLASS_NAME;
   /// Name of java class to use when reading the values
   static const char* VALUE_CLASS_NAME;
   /// Magic characters used to identify the file type
-  static uint8_t SEQ6_CODE[4];
+  static const uint8_t SEQ6_CODE[4];
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/exec/read-write-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/read-write-util-test.cc b/be/src/exec/read-write-util-test.cc
index 0f9c6ae..e4448de 100644
--- a/be/src/exec/read-write-util-test.cc
+++ b/be/src/exec/read-write-util-test.cc
@@ -56,6 +56,63 @@ TEST(ReadWriteUtil, BigEndian) {
   TestBigEndian<uint64_t>(0xffffffffffffff);
 }
 
+TEST(ReadWriteUtil, ZeroCompressedLongRequiredBytes) {
+  // Small longs stored in 1 byte
+  for (int64_t val = -112; val <= 127; val++) {
+    EXPECT_EQ(1, ReadWriteUtil::VLongRequiredBytes(val));
+  }
+  // Small longs stored in 2 bytes
+  for (int64_t val = -128; val < -112; val++) {
+    EXPECT_EQ(2, ReadWriteUtil::VLongRequiredBytes(val));
+  }
+  // Positive longs stored in 3-9 bytes
+  int64_t val = 0x7000ab00cd00ef00;
+  for (int sh = 0; sh <= 6; sh++) {
+    EXPECT_EQ(9 - sh, ReadWriteUtil::VLongRequiredBytes(val));
+    val = val >> 8;
+  }
+  // Negative longs stored 3-9 bytes
+  val = 0x8000ab00cd00ef00;
+  for (int sh = 0; sh <= 6; sh++) {
+    EXPECT_EQ(9 - sh, ReadWriteUtil::VLongRequiredBytes(val));
+    val = val >> 8;
+  }
+  //Max/min long is stored in 9 bytes
+  EXPECT_EQ(9, ReadWriteUtil::VLongRequiredBytes(0x7fffffffffffffff));
+  EXPECT_EQ(9, ReadWriteUtil::VLongRequiredBytes(0x8000000000000000));
+}
+
+void TestPutGetZeroCompressedLong(int64_t val) {
+  uint8_t buffer[9];
+  int64_t read_val;
+  int64_t num_bytes = ReadWriteUtil::PutVLong(val, buffer);
+  int64_t read_bytes = ReadWriteUtil::GetVLong(buffer, &read_val);
+  EXPECT_EQ(read_bytes, num_bytes);
+  EXPECT_EQ(read_val, val);
+}
+
+TEST(ReadWriteUtil, ZeroCompressedLong) {
+  //1 byte longs
+  for (int64_t val = -128; val <= 127; val++) {
+    TestPutGetZeroCompressedLong(val);
+  }
+  //2+ byte positive longs
+  int64_t val = 0x70100000200000ab;
+  for (int sh = 0; sh <= 6; sh++) {
+    TestPutGetZeroCompressedLong(val);
+    val = val >> 8;
+  }
+  //2+ byte negative longs
+  val = 0x80100000200000ab;
+  for (int sh = 0; sh <= 6; sh++) {
+    TestPutGetZeroCompressedLong(val);
+    val = val >> 8;
+  }
+  //Max/min long
+  TestPutGetZeroCompressedLong(0x7fffffffffffffff);
+  TestPutGetZeroCompressedLong(0x8000000000000000);
+}
+
 }
 
 IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/exec/read-write-util.h
----------------------------------------------------------------------
diff --git a/be/src/exec/read-write-util.h b/be/src/exec/read-write-util.h
index c384839..84d41dd 100644
--- a/be/src/exec/read-write-util.h
+++ b/be/src/exec/read-write-util.h
@@ -60,11 +60,11 @@ class ReadWriteUtil {
   static int GetVInt(uint8_t* buf, int32_t* vint);
 
   /// Writes a variable-length Long or int value to a byte buffer.
-  /// Returns the number of bytes written
+  /// Returns the number of bytes written.
   static int64_t PutVLong(int64_t val, uint8_t* buf);
   static int64_t PutVInt(int32_t val, uint8_t* buf);
 
-  /// returns size of the encoded long value, not including the 1 byte for length
+  /// Returns size of the encoded long value, including the 1 byte for length.
   static int VLongRequiredBytes(int64_t val);
 
   /// Read a variable-length Long value from a byte buffer starting at the specified
@@ -211,41 +211,63 @@ inline int ReadWriteUtil::GetVLong(uint8_t* buf, int64_t offset, int64_t* vlong)
   return len;
 }
 
+// Returns size of the encoded long value, including the 1 byte for length for val < -112
+// or val > 127.
 inline int ReadWriteUtil::VLongRequiredBytes(int64_t val) {
-  // returns size of the encoded long value, not including the 1 byte for length
-  if (val & 0xFF00000000000000llu) return 8;
-  if (val & 0x00FF000000000000llu) return 7;
-  if (val & 0x0000FF0000000000llu) return 6;
-  if (val & 0x000000FF00000000llu) return 5;
-  if (val & 0x00000000FF000000llu) return 4;
-  if (val & 0x0000000000FF0000llu) return 3;
-  if (val & 0x000000000000FF00llu) return 2;
-  // Values between -112 and 127 are stored using 1 byte,
-  // values between -127 and -112 are stored using 2 bytes
-  // See ReadWriteUtil::DecodeVIntSize for this case
-  if (val < -112) return 2;
-  return 1;
+  if (val >= -112 && val <= 127) return 1;
+  // If 'val' is negtive, take the one's complement.
+  if (val < 0) val = ~val;
+  return 9 - __builtin_clzll(val)/8;
 }
 
+// Serializes 'val' to a binary stream with zero-compressed encoding. For -112<=val<=127,
+// only one byte is used with the actual value. For other values of 'val', the first byte
+// value indicates whether the long is positive or negative, and the number of bytes that
+// follow. If the first byte value v is between -113 and -120, the following long is
+// positive, with number of bytes that follow are -(v+112). If the first byte value v is
+// between -121 and -128, the following long is negative, with number of bytes that follow
+// are -(v+120). Bytes are stored in the high-non-zero-byte-first order. Returns the
+// number of bytes written.
+// For more information, see the documentation for 'WritableUtils.writeVLong()' method:
+// https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/io/WritableUtils.html
 inline int64_t ReadWriteUtil::PutVLong(int64_t val, uint8_t* buf) {
   int64_t num_bytes = VLongRequiredBytes(val);
 
   if (num_bytes == 1) {
+    DCHECK(val >= -112 && val <= 127);
     // store the value itself instead of the length
     buf[0] = static_cast<int8_t>(val);
     return 1;
   }
 
   // This is how we encode the length for a length less than or equal to 8
-  buf[0] = -119 + num_bytes;
+  DCHECK_GE(num_bytes, 2);
+  DCHECK_LE(num_bytes, 9);
+  if (val < 0) {
+    DCHECK_LT(val, -112);
+    // The first byte in 'buf' should contain a value between -121 and -128 that makes the
+    // following condition true: -(buf[0] + 120) == num_bytes - 1.
+    // Note that 'num_bytes' includes the 1 extra byte for length.
+    buf[0] = -(num_bytes + 119);
+    // If 'val' is negtive, take the one's complement.
+    // See the source code for WritableUtils.writeVLong() method:
+    // https://hadoop.apache.org/docs/r2.7.2/api/src-html/org/apache/hadoop/io/
+    // WritableUtils.html#line.271
+    val = ~val;
+  } else {
+    DCHECK_GT(val, 127);
+    // The first byte in 'buf' should contain a value between -113 and -120 that makes the
+    // following condition true: -(buf[0] + 112) == num_bytes - 1.
+    // Note that 'num_bytes' includes the 1 extra byte for length.
+    buf[0] = -(num_bytes + 111);
+  }
 
-  // write to buffer in reversed endianness
-  for (int i = 0; i < num_bytes; ++i) {
-    buf[i+1] = (val >> (8 * (num_bytes - i - 1))) & 0xFF;
+  // write to the buffer in Big Endianness
+  for (int i = 1; i < num_bytes; ++i) {
+    buf[i] = (val >> (8 * (num_bytes - i - 1))) & 0xFF;
   }
 
-  // +1 for the length byte
-  return num_bytes + 1;
+  return num_bytes;
 }
 
 inline int64_t ReadWriteUtil::PutVInt(int32_t val, uint8_t* buf) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/util/compress.cc
----------------------------------------------------------------------
diff --git a/be/src/util/compress.cc b/be/src/util/compress.cc
index 429ae66..efa39bf 100644
--- a/be/src/util/compress.cc
+++ b/be/src/util/compress.cc
@@ -204,13 +204,27 @@ Status SnappyBlockCompressor::ProcessBlock(bool output_preallocated,
     int64_t input_length, const uint8_t* input, int64_t *output_length,
     uint8_t** output) {
   DCHECK_GE(input_length, 0);
-  // Hadoop uses a block compression scheme on top of snappy.  First there is
-  // an integer which is the size of the decompressed data followed by a
-  // sequence of compressed blocks each preceded with an integer size.
-  // For testing purposes we are going to generate two blocks.
-  int64_t block_size = input_length / 2;
-  size_t length = snappy::MaxCompressedLength(block_size) * 2;
-  length += 3 * sizeof (int32_t);
+  // Hadoop uses a block compression scheme on top of snappy. The layout is as follows:
+  // - size of the entire decompressed data (4 bytes)
+  // - size of the 1st compressed block (4 bytes)
+  // - 1st compressed block
+  // - size of the 2nd compressed block (4 bytes)
+  // - 2nd compressed block
+  // ...
+  // For testing purposes we are going to generate two blocks if input_length >= 4K.
+  vector<int64_t> block_sizes;
+  size_t length;
+  if (input_length == 0) {
+    length = sizeof (int32_t);
+  } else if (input_length < 4 * 1024) {
+    block_sizes.push_back(input_length);
+    length = snappy::MaxCompressedLength(block_sizes[0]) + 2 * sizeof (int32_t);
+  } else {
+    block_sizes.push_back(input_length / 2);
+    block_sizes.push_back(input_length - block_sizes[0]);
+    length = snappy::MaxCompressedLength(block_sizes[0]) +
+        snappy::MaxCompressedLength(block_sizes[1]) + 3 * sizeof (int32_t);
+  }
   DCHECK(!output_preallocated || length <= *output_length);
 
   if (output_preallocated) {
@@ -222,13 +236,12 @@ Status SnappyBlockCompressor::ProcessBlock(bool output_preallocated,
   }
 
   uint8_t* outp = out_buffer_;
-  uint8_t* sizep;
   ReadWriteUtil::PutInt(outp, static_cast<uint32_t>(input_length));
   outp += sizeof (int32_t);
-  while (input_length > 0) {
+  for (int64_t block_size: block_sizes) {
     // TODO: should this be a while or a do-while loop? Check what Hadoop does.
     // Point at the spot to store the compressed size.
-    sizep = outp;
+    uint8_t* sizep = outp;
     outp += sizeof (int32_t);
     size_t size;
     snappy::RawCompress(reinterpret_cast<const char*>(input),
@@ -236,8 +249,8 @@ Status SnappyBlockCompressor::ProcessBlock(bool output_preallocated,
 
     ReadWriteUtil::PutInt(sizep, static_cast<uint32_t>(size));
     input += block_size;
-    input_length -= block_size;
     outp += size;
+    DCHECK_LE(outp - out_buffer_, length);
   }
 
   *output = out_buffer_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/be/src/util/decompress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc
index 7709bb1..1f84bad 100644
--- a/be/src/util/decompress-test.cc
+++ b/be/src/util/decompress-test.cc
@@ -73,8 +73,15 @@ class DecompressorTest : public ::testing::Test {
       DecompressInsufficientOutputBuffer(compressor.get(), decompressor.get(),
           sizeof(input_), input_);
     } else {
-      CompressAndDecompress(compressor.get(), decompressor.get(), sizeof(input_),
+      CompressAndDecompress(compressor.get(), decompressor.get(), sizeof(input_), input_);
+      // Test with odd-length input (to test the calculation of block-sizes in
+      // SnappyBlockCompressor)
+      CompressAndDecompress(compressor.get(), decompressor.get(), sizeof(input_) - 1,
           input_);
+      // Test with input length of 1024 (to test SnappyBlockCompressor with a single
+      // block)
+      CompressAndDecompress(compressor.get(), decompressor.get(), 1024, input_);
+      // Test with empty input
       if (format != THdfsCompression::BZIP2) {
         CompressAndDecompress(compressor.get(), decompressor.get(), 0, NULL);
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test b/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test
index fe4d829..528b83e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/seq-writer.test
@@ -1,8 +1,5 @@
 ====
 ---- QUERY
-drop table if exists __seq_write;
-====
----- QUERY
 SET COMPRESSION_CODEC=NONE;
 SET ALLOW_UNSUPPORTED_FORMATS=1;
 SET SEQ_COMPRESSION_MODE=BLOCK;
@@ -92,5 +89,192 @@ select * from __seq_write;
 INT,STRING,DOUBLE
 ====
 ---- QUERY
-drop table __seq_write;
+# IMPALA-3079: Create a table containing larger seq files with NONE+RECORD and then read
+# it back
+SET COMPRESSION_CODEC=NONE;
+SET SEQ_COMPRESSION_MODE=RECORD;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_none_rec like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_none_rec partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_none_rec;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with DEFAULT+RECORD and then
+# read it back
+SET COMPRESSION_CODEC=DEFAULT;
+SET SEQ_COMPRESSION_MODE=RECORD;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_def_rec like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_def_rec partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_def_rec;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with SNAPPY_BLOCKED+RECORD and
+# then read it back
+SET COMPRESSION_CODEC=SNAPPY_BLOCKED;
+SET SEQ_COMPRESSION_MODE=RECORD;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_snapb_rec like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_snapb_rec partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_snapb_rec;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with SNAPPY+RECORD and then read
+# it back
+SET COMPRESSION_CODEC=SNAPPY;
+SET SEQ_COMPRESSION_MODE=RECORD;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_snap_rec like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_snap_rec partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_snap_rec;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with GZIP+RECORD and then read
+# it back
+SET COMPRESSION_CODEC=GZIP;
+SET SEQ_COMPRESSION_MODE=RECORD;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_gzip_rec like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_gzip_rec partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_gzip_rec;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with NONE+BLOCK and then read it
+# back
+SET COMPRESSION_CODEC=NONE;
+SET SEQ_COMPRESSION_MODE=BLOCK;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_none_block like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_none_block partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_none_block;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with DEFAULT+BLOCK and then read
+# it back
+SET COMPRESSION_CODEC=DEFAULT;
+SET SEQ_COMPRESSION_MODE=BLOCK;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_def_block like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_def_block partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_def_block;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with SNAPPY_BLOCKED+BLOCK and
+# then read it back
+SET COMPRESSION_CODEC=SNAPPY_BLOCKED;
+SET SEQ_COMPRESSION_MODE=BLOCK;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_snapb_block like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_snapb_block partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_snapb_block;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with SNAPPY+BLOCK and then read
+# it back
+SET COMPRESSION_CODEC=SNAPPY;
+SET SEQ_COMPRESSION_MODE=BLOCK;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_snap_block like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_snap_block partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_snap_block;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
+====
+---- QUERY
+# IMPALA-3079: Create a table containing larger seq files with GZIP+BLOCK and then read it
+# back
+SET COMPRESSION_CODEC=GZIP;
+SET SEQ_COMPRESSION_MODE=BLOCK;
+SET ALLOW_UNSUPPORTED_FORMATS=1;
+create table store_sales_seq_gzip_block like tpcds_parquet.store_sales
+stored as SEQUENCEFILE;
+insert into store_sales_seq_gzip_block partition(ss_sold_date_sk)
+select * from tpcds_parquet.store_sales
+where ss_sold_date_sk between 2450816 and 2451200;
+====
+---- QUERY
+select count(*) from store_sales_seq_gzip_block;
+---- RESULTS
+37999
+---- TYPES
+BIGINT
 ====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59b2db6b/tests/query_test/test_compressed_formats.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_compressed_formats.py b/tests/query_test/test_compressed_formats.py
index 654ea48..36dc427 100644
--- a/tests/query_test/test_compressed_formats.py
+++ b/tests/query_test/test_compressed_formats.py
@@ -144,18 +144,43 @@ class TestTableWriters(ImpalaTestSuite):
         (v.get_value('table_format').file_format =='text' and
         v.get_value('table_format').compression_codec == 'none'))
 
-  def test_seq_writer(self, vector):
-    # TODO debug this test, same as seq writer.
-    # This caused by a zlib failure. Suspected cause is too small a buffer
-    # passed to zlib for compression; similar to IMPALA-424
-    pytest.skip()
-    self.run_test_case('QueryTest/seq-writer', vector)
+  def test_seq_writer(self, vector, unique_database):
+    self.run_test_case('QueryTest/seq-writer', vector, unique_database)
+
+  def test_seq_writer_hive_compatibility(self, vector, unique_database):
+    self.client.execute('set ALLOW_UNSUPPORTED_FORMATS=1')
+    # Write sequence files with different compression codec/compression mode and then read
+    # it back in Impala and Hive.
+    # Note that we don't test snappy here as the snappy codec used by Impala does not seem
+    # to be fully compatible with the snappy codec used by Hive.
+    for comp_codec, comp_mode in [('NONE', 'RECORD'), ('NONE', 'BLOCK'),
+                                  ('DEFAULT', 'RECORD'), ('DEFAULT', 'BLOCK'),
+                                  ('GZIP', 'RECORD'), ('GZIP', 'BLOCK')]:
+      table_name = '%s.seq_tbl_%s_%s' % (unique_database, comp_codec, comp_mode)
+      self.client.execute('set COMPRESSION_CODEC=%s' % comp_codec)
+      self.client.execute('set SEQ_COMPRESSION_MODE=%s' % comp_mode)
+      self.client.execute('create table %s like functional.zipcode_incomes stored as '
+          'sequencefile' % table_name)
+      # Write sequence file of size greater than 4K
+      self.client.execute('insert into %s select * from functional.zipcode_incomes where '
+          'zip >= "5"' % table_name)
+      # Write sequence file of size less than 4K
+      self.client.execute('insert into %s select * from functional.zipcode_incomes where '
+          'zip="00601"' % table_name)
+      # Read it back in Impala
+      output = self.client.execute('select count(*) from %s' % table_name)
+      assert '16541' == output.get_data()
+      # Read it back in Hive
+      output = self.run_stmt_in_hive('select count(*) from %s' % table_name)
+      assert '16541' == output.split('\n')[1]
 
   def test_avro_writer(self, vector):
     self.run_test_case('QueryTest/avro-writer', vector)
 
   def test_text_writer(self, vector):
-    # TODO debug this test, same as seq writer.
+    # TODO debug this test.
+    # This caused by a zlib failure. Suspected cause is too small a buffer
+    # passed to zlib for compression; similar to IMPALA-424
     pytest.skip()
     self.run_test_case('QueryTest/text-writer', vector)