You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/09/26 18:27:44 UTC

[GitHub] [beam] TheNeuralBit commented on a diff in pull request #23014: Support DECIMAL logical type in python SDK

TheNeuralBit commented on code in PR #23014:
URL: https://github.com/apache/beam/pull/23014#discussion_r980363443


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/FixedPrecisionNumeric.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.values.Row;
+
+/** Fixed precision numeric types used to represent jdbc NUMERIC and DECIMAL types. */
+public class FixedPrecisionNumeric extends PassThroughLogicalType<BigDecimal> {
+  public static final String IDENTIFIER = "beam:logical_type:fixed_decimal:v1";
+
+  // TODO(https://github.com/apache/beam/issues/19817) implement beam:logical_type:decimal:v1 as
+  // CoderLogicalType (once CoderLogicalType is implemented).
+  /**
+   * Identifier of the unspecified precision numeric type. It corresponds to Java SDK's {@link
+   * FieldType#DECIMAL}. It is the underlying representation type of FixedPrecisionNumeric logical
+   * type in order to be compatible with existing Java field types.
+   */
+  public static final String BASE_IDENTIFIER = "beam:logical_type:decimal:v1";
+
+  private final int precision;
+  private final int scale;
+
+  /**
+   * Create a FixedPrecisionNumeric instance with specified precision and scale. ``precision=-1``
+   * indicates unspecified precision.
+   */
+  public static FixedPrecisionNumeric of(int precision, int scale) {
+    Schema schema = Schema.builder().addInt32Field("precision").addInt32Field("scale").build();
+    return new FixedPrecisionNumeric(schema, precision, scale);
+  }
+
+  /** Create a FixedPrecisionNumeric instance with specified scale and unspecified precision. */
+  public static FixedPrecisionNumeric of(int scale) {
+    return of(-1, scale);
+  }
+
+  /** Create a FixedPrecisionNumeric instance with specified argument row. */
+  public static FixedPrecisionNumeric of(Row row) {
+    final Integer precision = row.getInt32("precision");
+    final Integer scale = row.getInt32("scale");
+    checkArgument(
+        precision != null && scale != null,
+        "precision and scale cannot be null for FixedPrecisionNumeric arguments.");
+    // firstNonNull is used to cast precision and scale to @NonNull input
+    return of(firstNonNull(precision, -1), firstNonNull(scale, 0));
+  }
+
+  private FixedPrecisionNumeric(Schema schema, int precision, int scale) {
+    super(
+        IDENTIFIER,
+        FieldType.row(schema),
+        Row.withSchema(schema).addValues(precision, scale).build(),
+        FieldType.DECIMAL);
+    this.precision = precision;
+    this.scale = scale;
+  }
+
+  @Override
+  public BigDecimal toInputType(BigDecimal base) {
+    checkArgument(
+        base == null
+            || (base.precision() <= precision && base.scale() <= scale)
+            // for cases when received values can be safely coerced to the schema
+            || base.round(new MathContext(precision)).compareTo(base) == 0,
+        "Expected BigDecimal base to be null or have precision <= %s (was %s), scale <= %s (was %s)",
+        precision,
+        (base == null) ? null : base.precision(),
+        scale,
+        (base == null) ? null : base.scale());
+    return base;

Review Comment:
   The logic in this class is a little bit complicated, what do you think about adding a unit test just for this file?



##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -267,12 +269,24 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
           logical_type=schema_pb2.LogicalType(urn=PYTHON_ANY_URN),
           nullable=True)
     else:
-      # TODO(bhulette): Add support for logical types that require arguments
-      return schema_pb2.FieldType(
-          logical_type=schema_pb2.LogicalType(
-              urn=logical_type.urn(),
-              representation=self.typing_to_runner_api(
-                  logical_type.representation_type())))
+      if logical_type.argument_type() is None:
+        return schema_pb2.FieldType(
+            logical_type=schema_pb2.LogicalType(
+                urn=logical_type.urn(),
+                representation=self.typing_to_runner_api(
+                    logical_type.representation_type())))
+      else:
+        # TODO(bhulette,yathu): Complete support for logical types that require
+        # arguments.

Review Comment:
   Please file an issue for this and link it here. Also note that the option_to_runner_api and option_from_runner_api has logic for working with FieldValues, we should re-use that when we take this on.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java:
##########
@@ -65,7 +67,7 @@
 })
 public class SchemaTranslation {
 
-  private static final String URN_BEAM_LOGICAL_DECIMAL = "beam:logical_type:decimal:v1";
+  private static final String URN_BEAM_LOGICAL_DECIMAL = FixedPrecisionNumeric.BASE_IDENTIFIER;

Review Comment:
   Could you add this in schema.proto and reference that here? Then we can document the format in schema.proto as well 



##########
sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py:
##########
@@ -179,8 +180,8 @@ def test_xlang_jdbc_read(self, database):
     self._setUpTestCase(container_init, db_string, driver)
     table_name = 'jdbc_external_test_read'
     self.engine.execute(
-        "CREATE TABLE {}(f_int INTEGER, f_timestamp TIMESTAMP)".format(
-            table_name))
+        "CREATE TABLE {}(f_int INTEGER, f_timestamp TIMESTAMP, f_decimal DECIMAL(10,2))"  # pylint: disable=line-too-long
+        .format(table_name))

Review Comment:
   Can you also add a test(s) for this in standard_coders.yaml?



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/FixedPrecisionNumeric.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.values.Row;
+
+/** Fixed precision numeric types used to represent jdbc NUMERIC and DECIMAL types. */
+public class FixedPrecisionNumeric extends PassThroughLogicalType<BigDecimal> {
+  public static final String IDENTIFIER = "beam:logical_type:fixed_decimal:v1";
+
+  // TODO(https://github.com/apache/beam/issues/19817) implement beam:logical_type:decimal:v1 as
+  // CoderLogicalType (once CoderLogicalType is implemented).

Review Comment:
   What is CoderLogicalType? I don't a reference to it in that issue



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/FixedPrecisionNumeric.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.values.Row;
+
+/** Fixed precision numeric types used to represent jdbc NUMERIC and DECIMAL types. */
+public class FixedPrecisionNumeric extends PassThroughLogicalType<BigDecimal> {
+  public static final String IDENTIFIER = "beam:logical_type:fixed_decimal:v1";
+
+  // TODO(https://github.com/apache/beam/issues/19817) implement beam:logical_type:decimal:v1 as
+  // CoderLogicalType (once CoderLogicalType is implemented).
+  /**
+   * Identifier of the unspecified precision numeric type. It corresponds to Java SDK's {@link
+   * FieldType#DECIMAL}. It is the underlying representation type of FixedPrecisionNumeric logical
+   * type in order to be compatible with existing Java field types.
+   */
+  public static final String BASE_IDENTIFIER = "beam:logical_type:decimal:v1";
+
+  private final int precision;
+  private final int scale;
+
+  /**
+   * Create a FixedPrecisionNumeric instance with specified precision and scale. ``precision=-1``
+   * indicates unspecified precision.
+   */
+  public static FixedPrecisionNumeric of(int precision, int scale) {
+    Schema schema = Schema.builder().addInt32Field("precision").addInt32Field("scale").build();
+    return new FixedPrecisionNumeric(schema, precision, scale);
+  }
+
+  /** Create a FixedPrecisionNumeric instance with specified scale and unspecified precision. */
+  public static FixedPrecisionNumeric of(int scale) {
+    return of(-1, scale);
+  }
+
+  /** Create a FixedPrecisionNumeric instance with specified argument row. */
+  public static FixedPrecisionNumeric of(Row row) {
+    final Integer precision = row.getInt32("precision");

Review Comment:
   You might add an assertion here to check that `row`'s schema is assignable to the expected shema. I think there are some utilities for this in Schema.java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org