You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/12/12 22:22:28 UTC

parquet-cpp git commit: PARQUET-785: LIST schema conversion for Arrow lists

Repository: parquet-cpp
Updated Branches:
  refs/heads/master a1517582f -> 8487142f6


PARQUET-785: LIST schema conversion for Arrow lists

Author: Korn, Uwe <Uw...@blue-yonder.com>
Author: Uwe L. Korn <uw...@xhochy.com>

Closes #198 from xhochy/PARQUET-785 and squashes the following commits:

cc173e1 [Uwe L. Korn] Add 1-level list encoding
467c611 [Korn, Uwe] PARQUET-785: LIST schema conversion for Arrow lists


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/8487142f
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/8487142f
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/8487142f

Branch: refs/heads/master
Commit: 8487142f6d5a60d12e3068ac226b2b5dfe178350
Parents: a151758
Author: Korn, Uwe <Uw...@blue-yonder.com>
Authored: Mon Dec 12 17:22:21 2016 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Dec 12 17:22:21 2016 -0500

----------------------------------------------------------------------
 src/parquet/arrow/arrow-schema-test.cc | 230 +++++++++++++++++++++++++++-
 src/parquet/arrow/schema.cc            | 163 ++++++++++++++------
 2 files changed, 347 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8487142f/src/parquet/arrow/arrow-schema-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
index 3dfaf14..3437e71 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -157,15 +157,194 @@ TEST_F(TestConvertParquetSchema, ParquetFlatDecimals) {
   CheckFlatSchema(arrow_schema);
 }
 
+TEST_F(TestConvertParquetSchema, ParquetLists) {
+  std::vector<NodePtr> parquet_fields;
+  std::vector<std::shared_ptr<Field>> arrow_fields;
+
+  // LIST encoding example taken from parquet-format/LogicalTypes.md
+
+  // // List<String> (list non-null, elements nullable)
+  // required group my_list (LIST) {
+  //   repeated group list {
+  //     optional binary element (UTF8);
+  //   }
+  // }
+  {
+    auto element = PrimitiveNode::Make(
+        "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::UTF8);
+    auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
+    parquet_fields.push_back(
+        GroupNode::Make("my_list", Repetition::REQUIRED, {list}, LogicalType::LIST));
+    auto arrow_element = std::make_shared<Field>("string", UTF8, true);
+    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
+    arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, false));
+  }
+
+  // // List<String> (list nullable, elements non-null)
+  // optional group my_list (LIST) {
+  //   repeated group list {
+  //     required binary element (UTF8);
+  //   }
+  // }
+  {
+    auto element = PrimitiveNode::Make(
+        "string", Repetition::REQUIRED, ParquetType::BYTE_ARRAY, LogicalType::UTF8);
+    auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
+    parquet_fields.push_back(
+        GroupNode::Make("my_list", Repetition::OPTIONAL, {list}, LogicalType::LIST));
+    auto arrow_element = std::make_shared<Field>("string", UTF8, false);
+    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
+    arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, true));
+  }
+
+  // Element types can be nested structures. For example, a list of lists:
+  //
+  // // List<List<Integer>>
+  // optional group array_of_arrays (LIST) {
+  //   repeated group list {
+  //     required group element (LIST) {
+  //       repeated group list {
+  //         required int32 element;
+  //       }
+  //     }
+  //   }
+  // }
+  {
+    auto inner_element =
+        PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32);
+    auto inner_list = GroupNode::Make("list", Repetition::REPEATED, {inner_element});
+    auto element =
+        GroupNode::Make("element", Repetition::REQUIRED, {inner_list}, LogicalType::LIST);
+    auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
+    parquet_fields.push_back(GroupNode::Make(
+        "array_of_arrays", Repetition::OPTIONAL, {list}, LogicalType::LIST));
+    auto arrow_inner_element = std::make_shared<Field>("int32", INT32, false);
+    auto arrow_inner_list = std::make_shared<::arrow::ListType>(arrow_inner_element);
+    auto arrow_element = std::make_shared<Field>("element", arrow_inner_list, false);
+    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
+    arrow_fields.push_back(std::make_shared<Field>("array_of_arrays", arrow_list, true));
+  }
+
+  // // List<String> (list nullable, elements non-null)
+  // optional group my_list (LIST) {
+  //   repeated group element {
+  //     required binary str (UTF8);
+  //   };
+  // }
+  {
+    auto element = PrimitiveNode::Make(
+        "str", Repetition::REQUIRED, ParquetType::BYTE_ARRAY, LogicalType::UTF8);
+    auto list = GroupNode::Make("element", Repetition::REPEATED, {element});
+    parquet_fields.push_back(
+        GroupNode::Make("my_list", Repetition::OPTIONAL, {list}, LogicalType::LIST));
+    auto arrow_element = std::make_shared<Field>("str", UTF8, false);
+    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
+    arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, true));
+  }
+
+  // // List<Integer> (nullable list, non-null elements)
+  // optional group my_list (LIST) {
+  //   repeated int32 element;
+  // }
+  {
+    auto element =
+        PrimitiveNode::Make("element", Repetition::REPEATED, ParquetType::INT32);
+    parquet_fields.push_back(
+        GroupNode::Make("my_list", Repetition::OPTIONAL, {element}, LogicalType::LIST));
+    auto arrow_element = std::make_shared<Field>("element", INT32, false);
+    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
+    arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, true));
+  }
+
+  // // List<Tuple<String, Integer>> (nullable list, non-null elements)
+  // optional group my_list (LIST) {
+  //   repeated group element {
+  //     required binary str (UTF8);
+  //     required int32 num;
+  //   };
+  // }
+  {
+    auto str_element = PrimitiveNode::Make(
+        "str", Repetition::REQUIRED, ParquetType::BYTE_ARRAY, LogicalType::UTF8);
+    auto num_element =
+        PrimitiveNode::Make("num", Repetition::REQUIRED, ParquetType::INT32);
+    auto element =
+        GroupNode::Make("element", Repetition::REPEATED, {str_element, num_element});
+    parquet_fields.push_back(
+        GroupNode::Make("my_list", Repetition::OPTIONAL, {element}, LogicalType::LIST));
+    auto arrow_str = std::make_shared<Field>("str", UTF8, false);
+    auto arrow_num = std::make_shared<Field>("num", INT32, false);
+    std::vector<std::shared_ptr<Field>> fields({arrow_str, arrow_num});
+    auto arrow_struct = std::make_shared<::arrow::StructType>(fields);
+    auto arrow_element = std::make_shared<Field>("element", arrow_struct, false);
+    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
+    arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, true));
+  }
+
+  // // List<OneTuple<String>> (nullable list, non-null elements)
+  // optional group my_list (LIST) {
+  //   repeated group array {
+  //     required binary str (UTF8);
+  //   };
+  // }
+  // Special case: group is named array
+  {
+    auto element = PrimitiveNode::Make(
+        "str", Repetition::REQUIRED, ParquetType::BYTE_ARRAY, LogicalType::UTF8);
+    auto array = GroupNode::Make("array", Repetition::REPEATED, {element});
+    parquet_fields.push_back(
+        GroupNode::Make("my_list", Repetition::OPTIONAL, {array}, LogicalType::LIST));
+    auto arrow_str = std::make_shared<Field>("str", UTF8, false);
+    std::vector<std::shared_ptr<Field>> fields({arrow_str});
+    auto arrow_struct = std::make_shared<::arrow::StructType>(fields);
+    auto arrow_element = std::make_shared<Field>("array", arrow_struct, false);
+    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
+    arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, true));
+  }
+
+  // // List<OneTuple<String>> (nullable list, non-null elements)
+  // optional group my_list (LIST) {
+  //   repeated group my_list_tuple {
+  //     required binary str (UTF8);
+  //   };
+  // }
+  // Special case: group named ends in _tuple
+  {
+    auto element = PrimitiveNode::Make(
+        "str", Repetition::REQUIRED, ParquetType::BYTE_ARRAY, LogicalType::UTF8);
+    auto array = GroupNode::Make("my_list_tuple", Repetition::REPEATED, {element});
+    parquet_fields.push_back(
+        GroupNode::Make("my_list", Repetition::OPTIONAL, {array}, LogicalType::LIST));
+    auto arrow_str = std::make_shared<Field>("str", UTF8, false);
+    std::vector<std::shared_ptr<Field>> fields({arrow_str});
+    auto arrow_struct = std::make_shared<::arrow::StructType>(fields);
+    auto arrow_element = std::make_shared<Field>("my_list_tuple", arrow_struct, false);
+    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
+    arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, true));
+  }
+
+  // One-level encoding: Only allows required lists with required cells
+  //   repeated value_type name
+  {
+    parquet_fields.push_back(
+        PrimitiveNode::Make("name", Repetition::REPEATED, ParquetType::INT32));
+    auto arrow_element = std::make_shared<Field>("name", INT32, false);
+    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
+    arrow_fields.push_back(std::make_shared<Field>("name", arrow_list, false));
+  }
+
+  auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+  ASSERT_OK(ConvertSchema(parquet_fields));
+
+  CheckFlatSchema(arrow_schema);
+}
+
 TEST_F(TestConvertParquetSchema, UnsupportedThings) {
   std::vector<NodePtr> unsupported_nodes;
 
   unsupported_nodes.push_back(
       PrimitiveNode::Make("int96", Repetition::REQUIRED, ParquetType::INT96));
 
-  unsupported_nodes.push_back(
-      GroupNode::Make("repeated-group", Repetition::REPEATED, {}));
-
   unsupported_nodes.push_back(PrimitiveNode::Make(
       "int32", Repetition::OPTIONAL, ParquetType::INT32, LogicalType::DATE));
 
@@ -247,6 +426,51 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) {
   CheckFlatSchema(parquet_fields);
 }
 
+TEST_F(TestConvertArrowSchema, ParquetLists) {
+  std::vector<NodePtr> parquet_fields;
+  std::vector<std::shared_ptr<Field>> arrow_fields;
+
+  // parquet_arrow will always generate 3-level LIST encodings
+
+  // // List<String> (list non-null, elements nullable)
+  // required group my_list (LIST) {
+  //   repeated group list {
+  //     optional binary element (UTF8);
+  //   }
+  // }
+  {
+    auto element = PrimitiveNode::Make(
+        "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::UTF8);
+    auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
+    parquet_fields.push_back(
+        GroupNode::Make("my_list", Repetition::REQUIRED, {list}, LogicalType::LIST));
+    auto arrow_element = std::make_shared<Field>("string", UTF8, true);
+    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
+    arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, false));
+  }
+
+  // // List<String> (list nullable, elements non-null)
+  // optional group my_list (LIST) {
+  //   repeated group list {
+  //     required binary element (UTF8);
+  //   }
+  // }
+  {
+    auto element = PrimitiveNode::Make(
+        "string", Repetition::REQUIRED, ParquetType::BYTE_ARRAY, LogicalType::UTF8);
+    auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
+    parquet_fields.push_back(
+        GroupNode::Make("my_list", Repetition::OPTIONAL, {list}, LogicalType::LIST));
+    auto arrow_element = std::make_shared<Field>("string", UTF8, false);
+    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
+    arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, true));
+  }
+
+  ASSERT_OK(ConvertSchema(arrow_fields));
+
+  CheckFlatSchema(parquet_fields);
+}
+
 TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) {
   std::vector<NodePtr> parquet_fields;
   std::vector<std::shared_ptr<Field>> arrow_fields;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8487142f/src/parquet/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index 5a38a28..fe1db7a 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -152,56 +152,118 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) {
   return Status::OK();
 }
 
-// TODO: Logical Type Handling
+Status FromPrimitive(const PrimitiveNode* primitive, TypePtr* out) {
+  switch (primitive->physical_type()) {
+    case ParquetType::BOOLEAN:
+      *out = BOOL;
+      break;
+    case ParquetType::INT32:
+      RETURN_NOT_OK(FromInt32(primitive, out));
+      break;
+    case ParquetType::INT64:
+      RETURN_NOT_OK(FromInt64(primitive, out));
+      break;
+    case ParquetType::INT96:
+      // TODO: Do we have that type in Arrow?
+      // type = TypePtr(new Int96Type());
+      return Status::NotImplemented("int96");
+    case ParquetType::FLOAT:
+      *out = FLOAT;
+      break;
+    case ParquetType::DOUBLE:
+      *out = DOUBLE;
+      break;
+    case ParquetType::BYTE_ARRAY:
+      // TODO: Do we have that type in Arrow?
+      RETURN_NOT_OK(FromByteArray(primitive, out));
+      break;
+    case ParquetType::FIXED_LEN_BYTE_ARRAY:
+      RETURN_NOT_OK(FromFLBA(primitive, out));
+      break;
+  }
+  return Status::OK();
+}
+
+Status StructFromGroup(const GroupNode* group, TypePtr* out) {
+  std::vector<std::shared_ptr<Field>> fields(group->field_count());
+  for (int i = 0; i < group->field_count(); i++) {
+    RETURN_NOT_OK(NodeToField(group->field(i), &fields[i]));
+  }
+  *out = std::make_shared<::arrow::StructType>(fields);
+  return Status::OK();
+}
+
+bool str_endswith_tuple(const std::string& str) {
+  if (str.size() >= 6) { return str.substr(str.size() - 6, 6) == "_tuple"; }
+  return false;
+}
+
+Status NodeToList(const GroupNode* group, TypePtr* out) {
+  if (group->field_count() == 1) {
+    // This attempts to resolve the preferred 3-level list encoding.
+    NodePtr list_node = group->field(0);
+    if (list_node->is_group() && list_node->is_repeated()) {
+      const GroupNode* list_group = static_cast<const GroupNode*>(list_node.get());
+      // Special case mentioned in the format spec:
+      //   If the name is array or ends in _tuple, this should be a list of struct
+      //   even for single child elements.
+      if (list_group->field_count() == 1 && list_node->name() != "array" &&
+          !str_endswith_tuple(list_node->name())) {
+        // List of primitive type
+        std::shared_ptr<Field> item_field;
+        RETURN_NOT_OK(NodeToField(list_group->field(0), &item_field));
+        *out = std::make_shared<::arrow::ListType>(item_field);
+      } else {
+        // List of struct
+        std::shared_ptr<::arrow::DataType> inner_type;
+        RETURN_NOT_OK(StructFromGroup(list_group, &inner_type));
+        auto item_field = std::make_shared<Field>(list_node->name(), inner_type, false);
+        *out = std::make_shared<::arrow::ListType>(item_field);
+      }
+    } else if (list_node->is_repeated()) {
+      // repeated primitive node
+      std::shared_ptr<::arrow::DataType> inner_type;
+      const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(list_node.get());
+      RETURN_NOT_OK(FromPrimitive(primitive, &inner_type));
+      auto item_field = std::make_shared<Field>(list_node->name(), inner_type, false);
+      *out = std::make_shared<::arrow::ListType>(item_field);
+    } else {
+      return Status::NotImplemented(
+          "Non-repeated groups in a LIST-annotated group are not supported.");
+    }
+  } else {
+    return Status::NotImplemented(
+        "Only LIST-annotated groups with a single child can be handled.");
+  }
+  return Status::OK();
+}
+
 Status NodeToField(const NodePtr& node, std::shared_ptr<Field>* out) {
   std::shared_ptr<::arrow::DataType> type;
+  bool nullable = !node->is_required();
 
   if (node->is_repeated()) {
-    return Status::NotImplemented("No support yet for repeated node types");
-  }
-
-  if (node->is_group()) {
+    // 1-level LIST encoding fields are required
+    std::shared_ptr<::arrow::DataType> inner_type;
+    const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(node.get());
+    RETURN_NOT_OK(FromPrimitive(primitive, &inner_type));
+    auto item_field = std::make_shared<Field>(node->name(), inner_type, false);
+    type = std::make_shared<::arrow::ListType>(item_field);
+    nullable = false;
+  } else if (node->is_group()) {
     const GroupNode* group = static_cast<const GroupNode*>(node.get());
-    std::vector<std::shared_ptr<Field>> fields(group->field_count());
-    for (int i = 0; i < group->field_count(); i++) {
-      RETURN_NOT_OK(NodeToField(group->field(i), &fields[i]));
+    if (node->logical_type() == LogicalType::LIST) {
+      RETURN_NOT_OK(NodeToList(group, &type));
+    } else {
+      RETURN_NOT_OK(StructFromGroup(group, &type));
     }
-    type = std::make_shared<::arrow::StructType>(fields);
   } else {
     // Primitive (leaf) node
     const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(node.get());
-
-    switch (primitive->physical_type()) {
-      case ParquetType::BOOLEAN:
-        type = BOOL;
-        break;
-      case ParquetType::INT32:
-        RETURN_NOT_OK(FromInt32(primitive, &type));
-        break;
-      case ParquetType::INT64:
-        RETURN_NOT_OK(FromInt64(primitive, &type));
-        break;
-      case ParquetType::INT96:
-        // TODO: Do we have that type in Arrow?
-        // type = TypePtr(new Int96Type());
-        return Status::NotImplemented("int96");
-      case ParquetType::FLOAT:
-        type = FLOAT;
-        break;
-      case ParquetType::DOUBLE:
-        type = DOUBLE;
-        break;
-      case ParquetType::BYTE_ARRAY:
-        // TODO: Do we have that type in Arrow?
-        RETURN_NOT_OK(FromByteArray(primitive, &type));
-        break;
-      case ParquetType::FIXED_LEN_BYTE_ARRAY:
-        RETURN_NOT_OK(FromFLBA(primitive, &type));
-        break;
-    }
+    RETURN_NOT_OK(FromPrimitive(primitive, &type));
   }
 
-  *out = std::make_shared<Field>(node->name(), type, !node->is_required());
+  *out = std::make_shared<Field>(node->name(), type, nullable);
   return Status::OK();
 }
 
@@ -220,11 +282,22 @@ Status FromParquetSchema(
   return Status::OK();
 }
 
+Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::string& name,
+    bool nullable, const WriterProperties& properties, NodePtr* out) {
+  Repetition::type repetition = nullable ? Repetition::OPTIONAL : Repetition::REQUIRED;
+
+  NodePtr element;
+  RETURN_NOT_OK(FieldToNode(type->value_field(), properties, &element));
+
+  NodePtr list = GroupNode::Make("list", Repetition::REPEATED, {element});
+  *out = GroupNode::Make(name, repetition, {list}, LogicalType::LIST);
+  return Status::OK();
+}
+
 Status StructToNode(const std::shared_ptr<::arrow::StructType>& type,
     const std::string& name, bool nullable, const WriterProperties& properties,
     NodePtr* out) {
-  Repetition::type repetition = Repetition::REQUIRED;
-  if (nullable) { repetition = Repetition::OPTIONAL; }
+  Repetition::type repetition = nullable ? Repetition::OPTIONAL : Repetition::REQUIRED;
 
   std::vector<NodePtr> children(type->num_children());
   for (int i = 0; i < type->num_children(); i++) {
@@ -239,8 +312,8 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
     const WriterProperties& properties, NodePtr* out) {
   LogicalType::type logical_type = LogicalType::NONE;
   ParquetType::type type;
-  Repetition::type repetition = Repetition::REQUIRED;
-  if (field->nullable) { repetition = Repetition::OPTIONAL; }
+  Repetition::type repetition =
+      field->nullable ? Repetition::OPTIONAL : Repetition::REQUIRED;
   int length = -1;
 
   switch (field->type->type) {
@@ -324,6 +397,10 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
       auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type);
       return StructToNode(struct_type, field->name, field->nullable, properties, out);
     } break;
+    case ArrowType::LIST: {
+      auto list_type = std::static_pointer_cast<::arrow::ListType>(field->type);
+      return ListToNode(list_type, field->name, field->nullable, properties, out);
+    } break;
     default:
       // TODO: LIST, DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR
       return Status::NotImplemented("unhandled type");