You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/07 01:08:30 UTC

[GitHub] [beam] y1chi commented on a diff in pull request #22133: Add `schema_options` and `field_options` on RowTypeConstraint

y1chi commented on code in PR #22133:
URL: https://github.com/apache/beam/pull/22133#discussion_r915342721


##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -260,6 +270,86 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
               representation=self.typing_to_runner_api(
                   logical_type.representation_type())))
 
+  def option_from_runner_api(
+      self, option_proto: schema_pb2.Option) -> Tuple[str, Any]:
+    if not option_proto.HasField('type'):
+      return option_proto.name, None
+
+    fieldtype_proto = option_proto.type
+    if not fieldtype_proto.WhichOneof("type_info") == "atomic_type":
+      raise ValueError(
+          "Encounterd option with unsupported type. Only "
+          f"atomic_type options are supported: {option_proto}")
+
+    if fieldtype_proto.atomic_type == schema_pb2.BYTE:
+      value = np.int8(option_proto.value.atomic_value.byte)
+    elif fieldtype_proto.atomic_type == schema_pb2.INT16:
+      value = np.int16(option_proto.value.atomic_value.int16)
+    elif fieldtype_proto.atomic_type == schema_pb2.INT32:
+      value = np.int32(option_proto.value.atomic_value.int32)
+    elif fieldtype_proto.atomic_type == schema_pb2.INT64:
+      value = np.int64(option_proto.value.atomic_value.int64)
+    elif fieldtype_proto.atomic_type == schema_pb2.FLOAT:
+      value = np.float32(option_proto.value.atomic_value.float)
+    elif fieldtype_proto.atomic_type == schema_pb2.DOUBLE:
+      value = np.float64(option_proto.value.atomic_value.double)
+    elif fieldtype_proto.atomic_type == schema_pb2.STRING:
+      value = option_proto.value.atomic_value.string
+    elif fieldtype_proto.atomic_type == schema_pb2.BOOLEAN:
+      value = option_proto.value.atomic_value.boolean
+    elif fieldtype_proto.atomic_type == schema_pb2.BYTES:
+      value = option_proto.value.atomic_value.bytes
+    else:
+      raise ValueError(
+          "Unrecognized atomic_type ({fieldtype_proto.atomic_type}) "

Review Comment:
   f""?



##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -260,6 +270,86 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
               representation=self.typing_to_runner_api(
                   logical_type.representation_type())))
 
+  def option_from_runner_api(
+      self, option_proto: schema_pb2.Option) -> Tuple[str, Any]:
+    if not option_proto.HasField('type'):
+      return option_proto.name, None
+
+    fieldtype_proto = option_proto.type
+    if not fieldtype_proto.WhichOneof("type_info") == "atomic_type":

Review Comment:
   use `!=`?



##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -260,6 +270,86 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
               representation=self.typing_to_runner_api(
                   logical_type.representation_type())))
 
+  def option_from_runner_api(
+      self, option_proto: schema_pb2.Option) -> Tuple[str, Any]:
+    if not option_proto.HasField('type'):
+      return option_proto.name, None
+
+    fieldtype_proto = option_proto.type
+    if not fieldtype_proto.WhichOneof("type_info") == "atomic_type":
+      raise ValueError(
+          "Encounterd option with unsupported type. Only "
+          f"atomic_type options are supported: {option_proto}")
+
+    if fieldtype_proto.atomic_type == schema_pb2.BYTE:
+      value = np.int8(option_proto.value.atomic_value.byte)
+    elif fieldtype_proto.atomic_type == schema_pb2.INT16:
+      value = np.int16(option_proto.value.atomic_value.int16)
+    elif fieldtype_proto.atomic_type == schema_pb2.INT32:
+      value = np.int32(option_proto.value.atomic_value.int32)
+    elif fieldtype_proto.atomic_type == schema_pb2.INT64:
+      value = np.int64(option_proto.value.atomic_value.int64)
+    elif fieldtype_proto.atomic_type == schema_pb2.FLOAT:
+      value = np.float32(option_proto.value.atomic_value.float)
+    elif fieldtype_proto.atomic_type == schema_pb2.DOUBLE:
+      value = np.float64(option_proto.value.atomic_value.double)
+    elif fieldtype_proto.atomic_type == schema_pb2.STRING:
+      value = option_proto.value.atomic_value.string
+    elif fieldtype_proto.atomic_type == schema_pb2.BOOLEAN:
+      value = option_proto.value.atomic_value.boolean
+    elif fieldtype_proto.atomic_type == schema_pb2.BYTES:
+      value = option_proto.value.atomic_value.bytes
+    else:
+      raise ValueError(
+          "Unrecognized atomic_type ({fieldtype_proto.atomic_type}) "
+          "when decoding option {option_proto!r}")
+
+    return option_proto.name, value
+
+  def option_to_runner_api(self, option: Tuple[str, Any]) -> schema_pb2.Option:
+    name, value = option
+
+    if value is None:
+      # a value of None indicates the option is just a flag.
+      # Don't set type, value
+      return schema_pb2.Option(name=name)
+
+    fieldtype_proto = self.typing_to_runner_api(type(value))
+    if not fieldtype_proto.WhichOneof("type_info") == "atomic_type":

Review Comment:
   use `!=` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org