You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/28 19:34:23 UTC
[10/11] incubator-quickstep git commit: QUICKSTEP-46 fixed
QUICKSTEP-46 fixed
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7f0067b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7f0067b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7f0067b7
Branch: refs/heads/exact-filter
Commit: 7f0067b7913d8f2b68e8ac771fb9a87090d773e9
Parents: 1effc79
Author: tarun <ta...@gmail.com>
Authored: Mon Oct 10 20:29:42 2016 -0500
Committer: tarunbansal <ta...@gmail.com>
Committed: Fri Oct 28 10:02:34 2016 -0500
----------------------------------------------------------------------
relational_operators/CMakeLists.txt | 14 ++++
relational_operators/TextScanOperator.cpp | 73 ++++++++++++++++----
relational_operators/TextScanOperator.hpp | 19 ++++-
.../tests/text_scan_faulty_golden_output.txt | 5 ++
.../tests/text_scan_faulty_input.txt | 4 ++
5 files changed, 99 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0067b7/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 8dd65d0..0735bce 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -784,3 +784,17 @@ add_test(TextScanOperator_unittest
${TEXT_SCAN_INPUT_FILE}
${TEXT_SCAN_GOLDEN_OUTPUT_FILE}
${TEXT_SCAN_FAILURE_OUTPUT_FILE})
+file(TO_NATIVE_PATH
+ "${CMAKE_CURRENT_SOURCE_DIR}/tests/text_scan_faulty_input.txt"
+ TEXT_SCAN_FAULTY_INPUT_FILE)
+file(TO_NATIVE_PATH
+ "${CMAKE_CURRENT_SOURCE_DIR}/tests/text_scan_faulty_golden_output.txt"
+ TEXT_SCAN_FAULTY_GOLDEN_OUTPUT_FILE)
+file(TO_NATIVE_PATH
+ "${CMAKE_CURRENT_BINARY_DIR}/text_scan_faulty_failure_output.txt"
+ TEXT_SCAN_FAULTY_FAILURE_OUTPUT_FILE)
+add_test(TextScanOperator_faulty_unittest
+ TextScanOperator_unittest
+ ${TEXT_SCAN_FAULTY_INPUT_FILE}
+ ${TEXT_SCAN_FAULTY_GOLDEN_OUTPUT_FILE}
+ ${TEXT_SCAN_FAULTY_FAILURE_OUTPUT_FILE})
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0067b7/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp
index 4151bac..aa734d3 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -196,7 +196,9 @@ serialization::WorkOrder* TextScanOperator::createWorkOrderProto(const string &f
void TextScanWorkOrder::execute() {
const CatalogRelationSchema &relation = output_destination_->getRelation();
std::vector<Tuple> tuples;
+ bool is_faulty;
+ std::vector<TypedValue> vector_tuple_returned;
constexpr std::size_t kSmallBufferSize = 0x4000;
char *buffer = reinterpret_cast<char *>(malloc(std::max(text_segment_size_, kSmallBufferSize)));
@@ -218,7 +220,6 @@ void TextScanWorkOrder::execute() {
} else {
--row_ptr;
}
-
if (row_ptr >= buffer_end) {
// This block does not even contain a newline character.
return;
@@ -238,16 +239,23 @@ void TextScanWorkOrder::execute() {
// RIGHT AFTER the LAST newline character in this text segment.
// Process the tuples which are between the first newline character and the
- // last newline character.
+ // last newline character. SKIP any row which is corrupt instead of ABORTING the
+ // whole COPY operation.
while (row_ptr < end_ptr) {
if (*row_ptr == '\r' || *row_ptr == '\n') {
// Skip empty lines.
++row_ptr;
} else {
- tuples.emplace_back(parseRow(&row_ptr, relation));
+ vector_tuple_returned = parseRow(&row_ptr, relation, &is_faulty);
+ if (is_faulty) {
+ // Skip faulty rows
+ LOG(INFO) << "Faulty row found. Hence switching to next row.";
+ } else {
+ // Convert vector returned to tuple only when a valid row is encountered.
+ tuples.emplace_back(Tuple(std::move(vector_tuple_returned)));
+ }
}
}
-
// Process the tuple that is right after the last newline character.
// NOTE(jianqiao): dynamic_read_size is trying to balance between the cases
// that the last tuple is very small / very large.
@@ -279,7 +287,15 @@ void TextScanWorkOrder::execute() {
row_string.push_back('\n');
}
row_ptr = row_string.c_str();
- tuples.emplace_back(parseRow(&row_ptr, relation));
+
+ vector_tuple_returned = parseRow(&row_ptr, relation, &is_faulty);
+ if (is_faulty) {
+ // Skip the faulty row.
+ LOG(INFO) << "Faulty row found. Hence switching to next row.";
+ } else {
+ // Convert vector returned to tuple only when a valid row is encountered.
+ tuples.emplace_back(Tuple(std::move(vector_tuple_returned)));
+ }
}
std::fclose(file);
@@ -312,19 +328,26 @@ void TextScanWorkOrder::execute() {
output_destination_->bulkInsertTuples(&column_vectors);
}
-Tuple TextScanWorkOrder::parseRow(const char **row_ptr,
- const CatalogRelationSchema &relation) const {
+std::vector<TypedValue> TextScanWorkOrder::parseRow(const char **row_ptr,
+ const CatalogRelationSchema &relation, bool *is_faulty) const {
std::vector<TypedValue> attribute_values;
+ // Always assume current row is not faulty initially.
+ *is_faulty = false;
bool is_null_literal;
bool has_reached_end_of_line = false;
std::string value_str;
for (const auto &attr : relation) {
if (has_reached_end_of_line) {
- throw TextScanFormatError("Row has too few fields");
+ // Do not abort if one of the row is faulty.
+ // Set is_faulty to true and SKIP the current row.
+ *is_faulty = true;
+ LOG(INFO) << "Row has too few fields.";
+ return attribute_values;
}
value_str.clear();
+
extractFieldString(row_ptr,
&is_null_literal,
&has_reached_end_of_line,
@@ -333,24 +356,46 @@ Tuple TextScanWorkOrder::parseRow(const char **row_ptr,
if (is_null_literal) {
// NULL literal.
if (!attr.getType().isNullable()) {
- throw TextScanFormatError(
- "NULL literal '\\N' was specified for a column with a "
- "non-nullable Type");
+ *is_faulty = true;
+ LOG(INFO) << "NULL literal '\\N' was specified for a column with a "
+ "non-nullable Type.";
+ skipFaultyRow(row_ptr);
+ return attribute_values;
}
attribute_values.emplace_back(attr.getType().makeNullValue());
} else {
attribute_values.emplace_back();
if (!attr.getType().parseValueFromString(value_str, &(attribute_values.back()))) {
- throw TextScanFormatError("Failed to parse value");
+ // Do not abort if one of the row is faulty.
+ *is_faulty = true;
+ LOG(INFO) << "Failed to parse value.";
+ skipFaultyRow(row_ptr);
+ return attribute_values;
}
}
}
if (!has_reached_end_of_line) {
- throw TextScanFormatError("Row has too many fields");
+ // Do not abort if one of the row is faulty.
+ // Set is_faulty to true and SKIP the current row.
+ *is_faulty = true;
+ LOG(INFO) << "Row has too many fields.";
+ skipFaultyRow(row_ptr);
}
- return Tuple(std::move(attribute_values));
+ return attribute_values;
+}
+
+void TextScanWorkOrder::skipFaultyRow(const char **field_ptr) const {
+ const char *cur_ptr = *field_ptr;
+ // Move row pointer to the end of faulty row.
+ for (;; ++cur_ptr) {
+ const char c = *cur_ptr;
+ if (c == '\n') {
+ break;
+ }
+ }
+ *field_ptr = cur_ptr + 1;
}
void TextScanWorkOrder::extractFieldString(const char **field_ptr,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0067b7/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp
index 24af844..65863b3 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -24,6 +24,7 @@
#include <cstddef>
#include <exception>
#include <string>
+#include <vector>
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
@@ -241,6 +242,18 @@ class TextScanWorkOrder : public WorkOrder {
std::string *field_string) const;
/**
+ * @brief This method helps incorporate fault tolerance while ingesting data.
+ * It is called whenever a faulty row is encountered and it is
+ * required to move \p *field_ptr to the next row.
+ *
+ * @param field_ptr \p *field_ptr points to the current position of the input
+ * char stream while parsing a faulty row. After the call, \p *field_ptr
+ * will be modified to the start position of the NEXT record in the
+ * stream.
+ */
+ void skipFaultyRow(const char **field_ptr) const;
+
+ /**
* @brief Make a tuple by parsing all of the individual fields from a char stream.
*
* @param \p *row_ptr points to the current position of the input char stream
@@ -248,10 +261,12 @@ class TextScanWorkOrder : public WorkOrder {
* After the call, \p *row_ptr will be modified to the start position of
* the NEXT text row.
* @param relation The relation schema for the tuple.
+ * @param is_faulty OUTPUT parameter. Set to true if the row is faulty,
* @return The tuple parsed from the char stream.
*/
- Tuple parseRow(const char **row_ptr,
- const CatalogRelationSchema &relation) const;
+std::vector<TypedValue> parseRow(const char **row_ptr,
+ const CatalogRelationSchema &relation,
+ bool *is_faulty) const;
/**
* @brief Parse up to three octal digits (0-7) starting at \p *literal_ptr as
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0067b7/relational_operators/tests/text_scan_faulty_golden_output.txt
----------------------------------------------------------------------
diff --git a/relational_operators/tests/text_scan_faulty_golden_output.txt b/relational_operators/tests/text_scan_faulty_golden_output.txt
new file mode 100644
index 0000000..e07bedf
--- /dev/null
+++ b/relational_operators/tests/text_scan_faulty_golden_output.txt
@@ -0,0 +1,5 @@
++--------------------+------------------------+--------------------+-----------------------------------------+----------------------------------------+--------------------+
+|long_attr |double_attr |char_attr |datetime_attr |interval_attr |varchar_attr |
++--------------------+------------------------+--------------------+-----------------------------------------+----------------------------------------+--------------------+
+| 1234| 12.34| foo| 1994-04-27T08:20:50| 12 days 00:00:00| right_row|
++--------------------+------------------------+--------------------+-----------------------------------------+----------------------------------------+--------------------+
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7f0067b7/relational_operators/tests/text_scan_faulty_input.txt
----------------------------------------------------------------------
diff --git a/relational_operators/tests/text_scan_faulty_input.txt b/relational_operators/tests/text_scan_faulty_input.txt
new file mode 100644
index 0000000..aa00d39
--- /dev/null
+++ b/relational_operators/tests/text_scan_faulty_input.txt
@@ -0,0 +1,4 @@
+1234 12.34 foo 1994-04-27T08:20:50 12 days right_row
+1234 abcd foo 1994-04-27T08:20:50 12 days row_with_wrong_datatype_value
+1234 foo 1994-04-27T08:20:50 12 days row_with_less_values
+1234 abcd foo 1994-04-27T08:20:50 12 days bar row_with_more_values