You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/13 06:09:38 UTC

[07/10] incubator-impala git commit: IMPALA-1578: fix text scanner to handle "\r\n" delimiters split across blocks

IMPALA-1578: fix text scanner to handle "\r\n" delimiters split across blocks

This patch modifies HdfsTextScanner to specifically check for split
"\r\n" delimiters when the scan range ends with '\r'. If there does
turn out to be a split delimiter, the next tuple is considered the
responsibility of the next scan range's scanner, as if the delimiter
appeared fully in the second scan range. This should not affect the
overall performance characteristics of the text scanner since it
already must do a remote read past the end of the scan range to read
the last tuple.

Change-Id: Id42b441674bb21517ad2788b99942a4b5dc55420
Reviewed-on: http://gerrit.cloudera.org:8080/2803
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9174dee3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9174dee3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9174dee3

Branch: refs/heads/master
Commit: 9174dee395bd1391b54224804aaab18254910cd0
Parents: 2b61ae7
Author: Skye Wanderman-Milne <sk...@cloudera.com>
Authored: Thu May 12 16:12:03 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu May 12 23:06:36 2016 -0700

----------------------------------------------------------------------
 be/src/exec/delimited-text-parser.cc |  5 --
 be/src/exec/hdfs-text-scanner.cc     | 84 +++++++++++++++++++++++++++----
 be/src/exec/hdfs-text-scanner.h      | 21 ++++++++
 tests/query_test/test_scanners.py    | 77 ++++++++++++++++++++++++++++
 4 files changed, 171 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9174dee3/be/src/exec/delimited-text-parser.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/delimited-text-parser.cc b/be/src/exec/delimited-text-parser.cc
index f1185a4..950eae4 100644
--- a/be/src/exec/delimited-text-parser.cc
+++ b/be/src/exec/delimited-text-parser.cc
@@ -284,11 +284,6 @@ restart:
     if (num_escape_chars % 2 != 0) goto restart;
   }
 
-  if (tuple_start == len - 1 && buffer_start[tuple_start] == '\r') {
-    // If \r is the last char we need to wait to see if the next one is \n or not.
-    last_row_delim_offset_ = 0;
-    return -1;
-  }
   if (tuple_start < len && buffer_start[tuple_start] == '\n' &&
       buffer_start[tuple_start - 1] == '\r') {
     // We have \r\n, move to the next character.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9174dee3/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index dee34dc..dd0d524 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -249,6 +249,20 @@ Status HdfsTextScanner::ResetScanner() {
 Status HdfsTextScanner::FinishScanRange() {
   if (scan_node_->ReachedLimit()) return Status::OK();
 
+  DCHECK_EQ(byte_buffer_ptr_, byte_buffer_end_);
+  bool split_delimiter;
+  RETURN_IF_ERROR(CheckForSplitDelimiter(&split_delimiter));
+  if (split_delimiter) {
+    // If the scan range ends on the '\r' of a "\r\n", the next tuple is considered part
+    // of the next scan range. Nothing to do since we already fully parsed the previous
+    // tuple.
+    DCHECK(!delimited_text_parser_->HasUnfinishedTuple());
+    DCHECK(partial_tuple_empty_);
+    DCHECK(boundary_column_.Empty());
+    DCHECK(boundary_row_.Empty());
+    return Status::OK();
+  }
+
   // For text we always need to scan past the scan range to find the next delimiter
   while (true) {
     bool eosr = true;
@@ -576,29 +590,47 @@ Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) {
   if (num_rows_to_skip > 0) {
     int num_skipped_rows = 0;
     *tuple_found = false;
-    while (true) {
-      bool eosr = false;
-      RETURN_IF_ERROR(FillByteBuffer(&eosr));  // updates byte_buffer_read_size_
+    bool eosr = false;
+    // Offset maybe not point to a tuple boundary, skip ahead to the first tuple start in
+    // this scan range (if one exists).
+    do {
+      RETURN_IF_ERROR(FillByteBuffer(&eosr));
 
       delimited_text_parser_->ParserReset();
       SCOPED_TIMER(parse_delimiter_timer_);
       int next_tuple_offset = 0;
+      int bytes_left = byte_buffer_read_size_;
       while (num_skipped_rows < num_rows_to_skip) {
         next_tuple_offset = delimited_text_parser_->FindFirstInstance(byte_buffer_ptr_,
-          byte_buffer_read_size_);
+          bytes_left);
         if (next_tuple_offset == -1) break;
         byte_buffer_ptr_ += next_tuple_offset;
-        byte_buffer_read_size_ -= next_tuple_offset;
+        bytes_left -= next_tuple_offset;
         ++num_skipped_rows;
       }
 
-      if (next_tuple_offset == -1) {
-        // Didn't find enough new tuples in this buffer, continue with the next one.
-        if (!eosr) continue;
-      } else {
-        *tuple_found = true;
+      if (next_tuple_offset != -1) *tuple_found = true;
+    } while (!*tuple_found && !eosr);
+
+    // Special case: if the first delimiter is at the end of the current buffer, it's
+    // possible it's a split "\r\n" delimiter.
+    if (*tuple_found && byte_buffer_ptr_ == byte_buffer_end_) {
+      bool split_delimiter;
+      RETURN_IF_ERROR(CheckForSplitDelimiter(&split_delimiter));
+      if (split_delimiter) {
+        if (eosr) {
+          // Split delimiter at the end of the scan range. The next tuple is considered
+          // part of the next scan range, so we report no tuple found.
+          *tuple_found = false;
+        } else {
+          // Split delimiter at the end of the current buffer, but not eosr. Advance to
+          // the correct position in the next buffer.
+          RETURN_IF_ERROR(FillByteBuffer(&eosr));
+          DCHECK_GT(byte_buffer_read_size_, 0);
+          DCHECK_EQ(*byte_buffer_ptr_, '\n');
+          byte_buffer_ptr_ += 1;
+        }
       }
-      break;
     }
     if (num_rows_to_skip > 1 && num_skipped_rows != num_rows_to_skip) {
       DCHECK(!*tuple_found);
@@ -613,6 +645,36 @@ Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) {
   return Status::OK();
 }
 
+Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter) {
+  DCHECK_EQ(byte_buffer_ptr_, byte_buffer_end_);
+  *split_delimiter = false;
+
+  // Nothing in buffer
+  if (byte_buffer_read_size_ == 0) return Status::OK();
+
+  // If the line delimiter is "\n" (meaning we also accept "\r" and "\r\n" as delimiters)
+  // and the current buffer ends with '\r', this could be a "\r\n" delimiter.
+  bool split_delimiter_possible = context_->partition_descriptor()->line_delim() == '\n'
+      && *(byte_buffer_end_ - 1) == '\r';
+  if (!split_delimiter_possible) return Status::OK();
+
+  // The '\r' may be escaped. If it's not the text parser will report a complete tuple.
+  if (delimited_text_parser_->HasUnfinishedTuple()) return Status::OK();
+
+  // Peek ahead one byte to see if the '\r' is followed by '\n'.
+  Status status;
+  uint8_t* next_byte;
+  int64_t out_len;
+  stream_->GetBytes(1, &next_byte, &out_len, &status, /*peek*/ true);
+  RETURN_IF_ERROR(status);
+
+  // No more bytes after current buffer
+  if (out_len == 0) return Status::OK();
+
+  *split_delimiter = *next_byte == '\n';
+  return Status::OK();
+}
+
 // Codegen for materializing parsed data into tuples.  The function WriteCompleteTuple is
 // codegen'd using the IRBuilder for the specific tuple description.  This function
 // is then injected into the cross-compiled driving function, WriteAlignedTuples().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9174dee3/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index 74ce2e7..03b8343 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -27,6 +27,20 @@ struct HdfsFileDesc;
 
 /// HdfsScanner implementation that understands text-formatted records.
 /// Uses SSE instructions, if available, for performance.
+///
+/// Splitting text files:
+/// This scanner handles text files split across multiple blocks/scan ranges. Note that
+/// the split can occur anywhere in the file, e.g. in the middle of a row. Each scanner
+/// starts materializing tuples right after the first row delimiter found in the scan
+/// range, and stops at the first row delimiter occuring past the end of the scan
+/// range. If no delimiter is found in the scan range, the scanner doesn't materialize
+/// anything. This scheme ensures that every row is materialized by exactly one scanner.
+///
+/// A special case is a "\r\n" row delimiter split across two scan ranges. (When the row
+/// delimiter is '\n', we also consider '\r' and "\r\n" row delimiters.) In this case, the
+/// delimiter is considered part of the second scan range, i.e., the first scan range's
+/// scanner is responsible for the tuple directly before it, and the second scan range's
+/// scanner for the tuple directly after it.
 class HdfsTextScanner : public HdfsScanner {
  public:
   HdfsTextScanner(HdfsScanNode* scan_node, RuntimeState* state);
@@ -115,6 +129,13 @@ class HdfsTextScanner : public HdfsScanner {
   Status DecompressBufferStream(int64_t bytes_to_read, uint8_t** decompressed_buffer,
       int64_t* decompressed_len, bool *eosr);
 
+  /// Checks if the current buffer ends with a row delimiter spanning this and the next
+  /// buffer (i.e. a "\r\n" delimiter). Does not modify byte_buffer_ptr_, etc. Always
+  /// returns false if the table's row delimiter is not '\n'. This can only be called
+  /// after the buffer has been fully parsed, i.e. when byte_buffer_ptr_ ==
+  /// byte_buffer_end_.
+  Status CheckForSplitDelimiter(bool* split_delimiter);
+
   /// Prepends field data that was from the previous file buffer (This field straddled two
   /// file buffers). 'data' already contains the pointer/len from the current file buffer,
   /// boundary_column_ contains the beginning of the data from the previous file buffer.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9174dee3/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 6936765..216d3df 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -9,6 +9,7 @@
 import logging
 import pytest
 import random
+import tempfile
 from copy import deepcopy
 from subprocess import call, check_call
 
@@ -417,6 +418,82 @@ class TestTextScanRangeLengths(ImpalaTestSuite):
       result = self.client.execute("select count(*) from " + t)
       assert result.data == expected_result.data
 
+# Tests behavior of split "\r\n" delimiters.
+class TestTextSplitDelimiters(ImpalaTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestTextSplitDelimiters, cls).add_test_dimensions()
+    cls.TestMatrix.add_constraint(lambda v:\
+        v.get_value('table_format').file_format == 'text' and\
+        v.get_value('table_format').compression_codec == 'none')
+
+  def test_text_split_delimiters(self, vector, unique_database):
+    """Creates and queries a datafile that exercises interesting edge cases around split
+    "\r\n" delimiters. The data file contains the following 4-byte scan ranges:
+
+    abc\r   First scan range, ends with split \r\n
+            - materializes (abc)
+    \nde\r  Initial delimiter found, scan range ends with split \r\n
+            - materializes (de)
+    \nfg\r  Initial delimiter found, scan range ends with \r
+            - materializes (fg),(hij)
+    hij\r   Initial delimiter is \r at end
+            - materializes (klm)
+    klm\r   Initial delimiter is split \r\n
+            - materializes nothing
+    \nno\r  Final scan range, initial delimiter found, ends with \r
+            - materializes (no)
+    """
+    DATA = "abc\r\nde\r\nfg\rhij\rklm\r\nno\r"
+    max_scan_range_length = 4
+    expected_result = ['abc', 'de', 'fg', 'hij', 'klm', 'no']
+
+    self._create_and_query_test_table(
+      vector, unique_database, DATA, max_scan_range_length, expected_result)
+
+  def test_text_split_across_buffers_delimiter(self, vector, unique_database):
+    """Creates and queries a datafile that exercises a split "\r\n" across io buffers (but
+    within a single scan range). We use a 32MB file and 16MB scan ranges, so there are two
+    scan ranges of two io buffers each. The first scan range exercises a split delimiter
+    in the main text parsing algorithm. The second scan range exercises correctly
+    identifying a split delimiter as the first in a scan range."""
+    DEFAULT_IO_BUFFER_SIZE = 8 * 1024 * 1024
+    data = ('a' * (DEFAULT_IO_BUFFER_SIZE - 1) + "\r\n" + # first scan range
+            'b' * (DEFAULT_IO_BUFFER_SIZE - 3) + "\r\n" +
+            'a' * (DEFAULT_IO_BUFFER_SIZE - 1) + "\r\n" +     # second scan range
+            'b' * (DEFAULT_IO_BUFFER_SIZE - 1))
+    assert len(data) == DEFAULT_IO_BUFFER_SIZE * 4
+
+    max_scan_range_length = DEFAULT_IO_BUFFER_SIZE * 2
+    expected_result = data.split("\r\n")
+
+    self._create_and_query_test_table(
+      vector, unique_database, data, max_scan_range_length, expected_result)
+
+  def _create_and_query_test_table(self, vector, unique_database, data,
+        max_scan_range_length, expected_result):
+    TABLE_NAME = "test_text_split_delimiters"
+    qualified_table_name = "%s.%s" % (unique_database, TABLE_NAME)
+    location = get_fs_path("/test-warehouse/%s_%s" % (unique_database, TABLE_NAME))
+    query = "create table %s (s string) location '%s'" % (qualified_table_name, location)
+    self.client.execute(query)
+
+    with tempfile.NamedTemporaryFile() as f:
+      f.write(data)
+      f.flush()
+      check_call(['hadoop', 'fs', '-copyFromLocal', f.name, location])
+    self.client.execute("refresh %s" % qualified_table_name);
+
+    vector.get_value('exec_option')['max_scan_range_length'] = max_scan_range_length
+    query = "select * from %s" % qualified_table_name
+    result = self.execute_query_expect_success(
+      self.client, query, vector.get_value('exec_option'))
+
+    assert sorted(result.data) == sorted(expected_result)
 
 # Test for IMPALA-1740: Support for skip.header.line.count
 class TestTextScanRangeLengths(ImpalaTestSuite):