You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/28 19:03:25 UTC

incubator-quickstep git commit: QUICKSTEP-46 fixed

Repository: incubator-quickstep
Updated Branches:
  refs/heads/master 1effc794e -> 7f0067b79


QUICKSTEP-46 fixed

Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7f0067b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7f0067b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7f0067b7

Branch: refs/heads/master
Commit: 7f0067b7913d8f2b68e8ac771fb9a87090d773e9
Parents: 1effc79
Author: tarun <ta...@gmail.com>
Authored: Mon Oct 10 20:29:42 2016 -0500
Committer: tarunbansal <ta...@gmail.com>
Committed: Fri Oct 28 10:02:34 2016 -0500

----------------------------------------------------------------------
 relational_operators/CMakeLists.txt             | 14 ++++
 relational_operators/TextScanOperator.cpp       | 73 ++++++++++++++++----
 relational_operators/TextScanOperator.hpp       | 19 ++++-
 .../tests/text_scan_faulty_golden_output.txt    |  5 ++
 .../tests/text_scan_faulty_input.txt            |  4 ++
 5 files changed, 99 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0067b7/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 8dd65d0..0735bce 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -784,3 +784,17 @@ add_test(TextScanOperator_unittest
          ${TEXT_SCAN_INPUT_FILE}
          ${TEXT_SCAN_GOLDEN_OUTPUT_FILE}
          ${TEXT_SCAN_FAILURE_OUTPUT_FILE})
+file(TO_NATIVE_PATH
+     "${CMAKE_CURRENT_SOURCE_DIR}/tests/text_scan_faulty_input.txt"
+     TEXT_SCAN_FAULTY_INPUT_FILE)
+file(TO_NATIVE_PATH
+     "${CMAKE_CURRENT_SOURCE_DIR}/tests/text_scan_faulty_golden_output.txt"
+     TEXT_SCAN_FAULTY_GOLDEN_OUTPUT_FILE)
+file(TO_NATIVE_PATH
+     "${CMAKE_CURRENT_BINARY_DIR}/text_scan_faulty_failure_output.txt"
+     TEXT_SCAN_FAULTY_FAILURE_OUTPUT_FILE)
+add_test(TextScanOperator_faulty_unittest
+         TextScanOperator_unittest
+         ${TEXT_SCAN_FAULTY_INPUT_FILE}
+         ${TEXT_SCAN_FAULTY_GOLDEN_OUTPUT_FILE}
+         ${TEXT_SCAN_FAULTY_FAILURE_OUTPUT_FILE})
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0067b7/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp
index 4151bac..aa734d3 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -196,7 +196,9 @@ serialization::WorkOrder* TextScanOperator::createWorkOrderProto(const string &f
 void TextScanWorkOrder::execute() {
   const CatalogRelationSchema &relation = output_destination_->getRelation();
   std::vector<Tuple> tuples;
+  bool is_faulty;
 
+  std::vector<TypedValue> vector_tuple_returned;
   constexpr std::size_t kSmallBufferSize = 0x4000;
   char *buffer = reinterpret_cast<char *>(malloc(std::max(text_segment_size_, kSmallBufferSize)));
 
@@ -218,7 +220,6 @@ void TextScanWorkOrder::execute() {
   } else {
     --row_ptr;
   }
-
   if (row_ptr >= buffer_end) {
     // This block does not even contain a newline character.
     return;
@@ -238,16 +239,23 @@ void TextScanWorkOrder::execute() {
   // RIGHT AFTER the LAST newline character in this text segment.
 
   // Process the tuples which are between the first newline character and the
-  // last newline character.
+  // last newline character. SKIP any row which is corrupt instead of ABORTING the
+  // whole COPY operation.
   while (row_ptr < end_ptr) {
     if (*row_ptr == '\r' || *row_ptr == '\n') {
       // Skip empty lines.
       ++row_ptr;
     } else {
-      tuples.emplace_back(parseRow(&row_ptr, relation));
+      vector_tuple_returned = parseRow(&row_ptr, relation, &is_faulty);
+      if (is_faulty) {
+          // Skip faulty rows
+          LOG(INFO) << "Faulty row found. Hence switching to next row.";
+      } else {
+            // Convert vector returned to tuple only when a valid row is encountered.
+            tuples.emplace_back(Tuple(std::move(vector_tuple_returned)));
+      }
     }
   }
-
   // Process the tuple that is right after the last newline character.
   // NOTE(jianqiao): dynamic_read_size is trying to balance between the cases
   // that the last tuple is very small / very large.
@@ -279,7 +287,15 @@ void TextScanWorkOrder::execute() {
       row_string.push_back('\n');
     }
     row_ptr = row_string.c_str();
-    tuples.emplace_back(parseRow(&row_ptr, relation));
+
+    vector_tuple_returned = parseRow(&row_ptr, relation, &is_faulty);
+    if (is_faulty) {
+        // Skip the faulty row.
+        LOG(INFO) << "Faulty row found. Hence switching to next row.";
+    } else {
+        // Convert vector returned to tuple only when a valid row is encountered.
+        tuples.emplace_back(Tuple(std::move(vector_tuple_returned)));
+    }
   }
 
   std::fclose(file);
@@ -312,19 +328,26 @@ void TextScanWorkOrder::execute() {
   output_destination_->bulkInsertTuples(&column_vectors);
 }
 
-Tuple TextScanWorkOrder::parseRow(const char **row_ptr,
-                                  const CatalogRelationSchema &relation) const {
+std::vector<TypedValue> TextScanWorkOrder::parseRow(const char **row_ptr,
+                                  const CatalogRelationSchema &relation, bool *is_faulty) const {
   std::vector<TypedValue> attribute_values;
+  // Always assume current row is not faulty initially.
+  *is_faulty = false;
 
   bool is_null_literal;
   bool has_reached_end_of_line = false;
   std::string value_str;
   for (const auto &attr : relation) {
     if (has_reached_end_of_line) {
-      throw TextScanFormatError("Row has too few fields");
+        // Do not abort if one of the row is faulty.
+        // Set is_faulty to true and SKIP the current row.
+        *is_faulty = true;
+        LOG(INFO) << "Row has too few fields.";
+        return attribute_values;
     }
 
     value_str.clear();
+
     extractFieldString(row_ptr,
                        &is_null_literal,
                        &has_reached_end_of_line,
@@ -333,24 +356,46 @@ Tuple TextScanWorkOrder::parseRow(const char **row_ptr,
     if (is_null_literal) {
       // NULL literal.
       if (!attr.getType().isNullable()) {
-        throw TextScanFormatError(
-            "NULL literal '\\N' was specified for a column with a "
-            "non-nullable Type");
+          *is_faulty = true;
+          LOG(INFO) << "NULL literal '\\N' was specified for a column with a "
+                     "non-nullable Type.";
+          skipFaultyRow(row_ptr);
+          return attribute_values;
       }
       attribute_values.emplace_back(attr.getType().makeNullValue());
     } else {
       attribute_values.emplace_back();
       if (!attr.getType().parseValueFromString(value_str, &(attribute_values.back()))) {
-        throw TextScanFormatError("Failed to parse value");
+          // Do not abort if one of the row is faulty.
+          *is_faulty = true;
+          LOG(INFO) << "Failed to parse value.";
+          skipFaultyRow(row_ptr);
+          return attribute_values;
       }
     }
   }
 
   if (!has_reached_end_of_line) {
-    throw TextScanFormatError("Row has too many fields");
+      // Do not abort if one of the row is faulty.
+      // Set is_faulty to true and SKIP the current row.
+      *is_faulty = true;
+      LOG(INFO) << "Row has too many fields.";
+      skipFaultyRow(row_ptr);
   }
 
-  return Tuple(std::move(attribute_values));
+  return attribute_values;
+}
+
+void TextScanWorkOrder::skipFaultyRow(const char **field_ptr) const {
+    const char *cur_ptr = *field_ptr;
+    // Move row pointer to the end of faulty row.
+    for (;; ++cur_ptr) {
+        const char c = *cur_ptr;
+        if (c == '\n') {
+            break;
+        }
+    }
+    *field_ptr = cur_ptr + 1;
 }
 
 void TextScanWorkOrder::extractFieldString(const char **field_ptr,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0067b7/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp
index 24af844..65863b3 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -24,6 +24,7 @@
 #include <cstddef>
 #include <exception>
 #include <string>
+#include <vector>
 
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
@@ -241,6 +242,18 @@ class TextScanWorkOrder : public WorkOrder {
                           std::string *field_string) const;
 
   /**
+   * @brief This method helps incorporate fault tolerance while ingesting data.
+   *        It is called whenever a faulty row is encountered and it is
+   *        required to move \p *field_ptr to the next row.
+   *
+   * @param field_ptr \p *field_ptr points to the current position of the input
+   *        char stream while parsing a faulty row. After the call, \p *field_ptr
+   *        will be modified to the start position of the NEXT record in the
+   *        stream.
+   */
+  void skipFaultyRow(const char **field_ptr) const;
+
+  /**
    * @brief Make a tuple by parsing all of the individual fields from a char stream.
    *
    * @param \p *row_ptr points to the current position of the input char stream
@@ -248,10 +261,12 @@ class TextScanWorkOrder : public WorkOrder {
    *        After the call, \p *row_ptr will be modified to the start position of
    *        the NEXT text row.
    * @param relation The relation schema for the tuple.
+   * @param is_faulty OUTPUT parameter. Set to true if the row is faulty,
    * @return The tuple parsed from the char stream.
    */
-  Tuple parseRow(const char **row_ptr,
-                 const CatalogRelationSchema &relation) const;
+std::vector<TypedValue> parseRow(const char **row_ptr,
+                 const CatalogRelationSchema &relation,
+                 bool *is_faulty) const;
 
   /**
    * @brief Parse up to three octal digits (0-7) starting at \p *literal_ptr as

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0067b7/relational_operators/tests/text_scan_faulty_golden_output.txt
----------------------------------------------------------------------
diff --git a/relational_operators/tests/text_scan_faulty_golden_output.txt b/relational_operators/tests/text_scan_faulty_golden_output.txt
new file mode 100644
index 0000000..e07bedf
--- /dev/null
+++ b/relational_operators/tests/text_scan_faulty_golden_output.txt
@@ -0,0 +1,5 @@
++--------------------+------------------------+--------------------+-----------------------------------------+----------------------------------------+--------------------+
+|long_attr           |double_attr             |char_attr           |datetime_attr                            |interval_attr                           |varchar_attr        |
++--------------------+------------------------+--------------------+-----------------------------------------+----------------------------------------+--------------------+
+|                1234|                   12.34|                 foo|                      1994-04-27T08:20:50|                        12 days 00:00:00|           right_row|
++--------------------+------------------------+--------------------+-----------------------------------------+----------------------------------------+--------------------+

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0067b7/relational_operators/tests/text_scan_faulty_input.txt
----------------------------------------------------------------------
diff --git a/relational_operators/tests/text_scan_faulty_input.txt b/relational_operators/tests/text_scan_faulty_input.txt
new file mode 100644
index 0000000..aa00d39
--- /dev/null
+++ b/relational_operators/tests/text_scan_faulty_input.txt
@@ -0,0 +1,4 @@
+1234	12.34	foo	1994-04-27T08:20:50	12 days	right_row
+1234	abcd	foo	1994-04-27T08:20:50	12 days	row_with_wrong_datatype_value
+1234	foo	1994-04-27T08:20:50	12 days	row_with_less_values
+1234	abcd	foo	1994-04-27T08:20:50	12 days	bar	row_with_more_values