You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2022/12/02 13:42:45 UTC

[beam] 01/01: Revert "Support SqlTypes Date and Timestamp (MicrosInstant) in AvroUtils (#23969)"

This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch revert-23969-sqlttypes
in repository https://gitbox.apache.org/repos/asf/beam.git

commit e48b329e7a7e868ed38f7e8e0973957d81ed8678
Author: Yi Hu <hu...@gmail.com>
AuthorDate: Fri Dec 2 08:42:34 2022 -0500

    Revert "Support SqlTypes Date and Timestamp (MicrosInstant) in AvroUtils (#23969)"
    
    This reverts commit d5d76b974592d45de368ab641647ca5cc4ec12ec.
---
 .../apache/beam/sdk/schemas/utils/AvroUtils.java   | 91 ++--------------------
 .../beam/sdk/schemas/utils/AvroUtilsTest.java      | 33 --------
 2 files changed, 6 insertions(+), 118 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
index e61dbe505e2..8d01ed0406a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
@@ -34,7 +34,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
@@ -77,7 +76,6 @@ import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
 import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
 import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
 import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
-import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
 import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
 import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
@@ -89,7 +87,6 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.CaseFormat;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
@@ -101,42 +98,7 @@ import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.ReadableInstant;
 
-/**
- * Utils to convert AVRO records to Beam rows. Imposes a mapping between common avro types and Beam
- * portable schemas (https://s.apache.org/beam-schemas):
- *
- * <pre>
- *   Avro                Beam Field Type
- *   INT         <-----> INT32
- *   LONG        <-----> INT64
- *   FLOAT       <-----> FLOAT
- *   DOUBLE      <-----> DOUBLE
- *   BOOLEAN     <-----> BOOLEAN
- *   STRING      <-----> STRING
- *   BYTES       <-----> BYTES
- *               <------ LogicalType(urn="beam:logical_type:var_bytes:v1")
- *   FIXED       <-----> LogicalType(urn="beam:logical_type:fixed_bytes:v1")
- *   ARRAY       <-----> ARRAY
- *   ENUM        <-----> LogicalType(EnumerationType)
- *   MAP         <-----> MAP
- *   RECORD      <-----> ROW
- *   UNION       <-----> LogicalType(OneOfType)
- *   LogicalTypes.Date              <-----> LogicalType(DATE)
- *                                  <------ LogicalType(urn="beam:logical_type:date:v1")
- *   LogicalTypes.TimestampMillis   <-----> DATETIME
- *   LogicalTypes.TimestampMicros   <-----> LogicalType(urn="beam:logical_type:micros_instant:v1")
- *   LogicalTypes.Decimal           <-----> DECIMAL
- * </pre>
- *
- * For SQL CHAR/VARCHAR types, an Avro schema
- *
- * <pre>
- *   LogicalType({"type":"string","logicalType":"char","maxLength":MAX_LENGTH}) or
- *   LogicalType({"type":"string","logicalType":"varchar","maxLength":MAX_LENGTH})
- * </pre>
- *
- * is used.
- */
+/** Utils to convert AVRO records to Beam rows. */
 @Experimental(Kind.SCHEMAS)
 @SuppressWarnings({
   "nullness", // TODO(https://github.com/apache/beam/issues/20497)
@@ -810,11 +772,9 @@ public class AvroUtils {
       if (logicalType instanceof LogicalTypes.Decimal) {
         fieldType = FieldType.DECIMAL;
       } else if (logicalType instanceof LogicalTypes.TimestampMillis) {
-        // TODO(https://github.com/apache/beam/issues/19215) DATETIME primitive will be removed when
-        // fully migrates to java.time lib from joda-time
+        // TODO: There is a desire to move Beam schema DATETIME to a micros representation. When
+        // this is done, this logical type needs to be changed.
         fieldType = FieldType.DATETIME;
-      } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
-        fieldType = FieldType.logicalType(SqlTypes.TIMESTAMP);
       } else if (logicalType instanceof LogicalTypes.Date) {
         fieldType = FieldType.DATETIME;
       }
@@ -927,8 +887,8 @@ public class AvroUtils {
         break;
 
       case DATETIME:
-        // TODO(https://github.com/apache/beam/issues/19215) DATETIME primitive will be removed when
-        // fully migrates to java.time lib from joda-time
+        // TODO: There is a desire to move Beam schema DATETIME to a micros representation. When
+        // this is done, this logical type needs to be changed.
         baseType =
             LogicalTypes.timestampMillis().addToSchema(org.apache.avro.Schema.create(Type.LONG));
         break;
@@ -973,13 +933,10 @@ public class AvroUtils {
                   oneOfType.getOneOfSchema().getFields().stream()
                       .map(x -> getFieldSchema(x.getType(), x.getName(), namespace))
                       .collect(Collectors.toList()));
-        } else if ("DATE".equals(identifier) || SqlTypes.DATE.getIdentifier().equals(identifier)) {
+        } else if ("DATE".equals(identifier)) {
           baseType = LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT));
         } else if ("TIME".equals(identifier)) {
           baseType = LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT));
-        } else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
-          baseType =
-              LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG));
         } else {
           throw new RuntimeException(
               "Unhandled logical type " + fieldType.getLogicalType().getIdentifier());
@@ -1104,17 +1061,9 @@ public class AvroUtils {
                 oneOfValue.getValue());
           }
         } else if ("DATE".equals(identifier)) {
-          // "Date" is backed by joda.time.Instant
           return Days.daysBetween(Instant.EPOCH, (Instant) value).getDays();
-        } else if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
-          // portable SqlTypes.DATE is backed by java.time.LocalDate
-          return ((java.time.LocalDate) value).toEpochDay();
         } else if ("TIME".equals(identifier)) {
-          // "TIME" is backed by joda.time.Instant
           return (int) ((Instant) value).getMillis();
-        } else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
-          // portable SqlTypes.TIMESTAMP is backed by java.time.Instant
-          return getMicrosFromJavaInstant((java.time.Instant) value);
         } else {
           throw new RuntimeException("Unhandled logical type " + identifier);
         }
@@ -1188,19 +1137,10 @@ public class AvroUtils {
         } else {
           return convertDateTimeStrict((Long) value, fieldType);
         }
-      } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
-        if (value instanceof java.time.Instant) {
-          return convertMicroMillisStrict(
-              getMicrosFromJavaInstant((java.time.Instant) value), fieldType);
-        } else {
-          return convertMicroMillisStrict((Long) value, fieldType);
-        }
       } else if (logicalType instanceof LogicalTypes.Date) {
         if (value instanceof ReadableInstant) {
           int epochDays = Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays();
           return convertDateStrict(epochDays, fieldType);
-        } else if (value instanceof java.time.LocalDate) {
-          return convertDateStrict((int) ((java.time.LocalDate) value).toEpochDay(), fieldType);
         } else {
           return convertDateStrict((Integer) value, fieldType);
         }
@@ -1258,14 +1198,6 @@ public class AvroUtils {
     }
   }
 
-  /** Helper method to get epoch micros required by Avro TimeStampMicros logical type. */
-  @SuppressWarnings("JavaInstantGetSecondsGetNano")
-  @VisibleForTesting
-  static long getMicrosFromJavaInstant(java.time.Instant value) {
-    return TimeUnit.SECONDS.toMicros(value.getEpochSecond())
-        + TimeUnit.NANOSECONDS.toMicros(value.getNano());
-  }
-
   private static Object convertRecordStrict(GenericRecord record, Schema.FieldType fieldType) {
     checkTypeName(fieldType.getTypeName(), Schema.TypeName.ROW, "record");
     return toBeamRowStrict(record, fieldType.getRowSchema());
@@ -1315,17 +1247,6 @@ public class AvroUtils {
     return new Instant(value);
   }
 
-  private static Object convertMicroMillisStrict(Long value, Schema.FieldType fieldType) {
-    checkTypeName(
-        fieldType.getTypeName(), TypeName.LOGICAL_TYPE, SqlTypes.TIMESTAMP.getIdentifier());
-    checkArgument(
-        fieldType.getLogicalType().getIdentifier().equals(SqlTypes.TIMESTAMP.getIdentifier()));
-
-    return java.time.Instant.ofEpochSecond(
-        TimeUnit.MICROSECONDS.toSeconds(value),
-        TimeUnit.MICROSECONDS.toNanos(Math.floorMod(value, TimeUnit.SECONDS.toMicros(1))));
-  }
-
   private static Object convertFloatStrict(Float value, Schema.FieldType fieldType) {
     checkTypeName(fieldType.getTypeName(), Schema.TypeName.FLOAT, "float");
     return value;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
index 0605aa6fd1c..3087959c1e0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
@@ -29,7 +29,6 @@ import java.nio.ByteBuffer;
 import java.sql.JDBCType;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import org.apache.avro.Conversions;
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
@@ -47,7 +46,6 @@ import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
 import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
-import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.schemas.utils.AvroGenerators.RecordSchemaGenerator;
 import org.apache.beam.sdk.schemas.utils.AvroUtils.TypeWithNullability;
 import org.apache.beam.sdk.testing.CoderProperties;
@@ -231,12 +229,6 @@ public class AvroUtilsTest {
             LogicalTypes.timestampMillis().addToSchema(org.apache.avro.Schema.create(Type.LONG)),
             "",
             (Object) null));
-    fields.add(
-        new org.apache.avro.Schema.Field(
-            "timestampMicros",
-            LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG)),
-            "",
-            (Object) null));
     fields.add(new org.apache.avro.Schema.Field("row", getAvroSubSchema("row"), "", (Object) null));
     fields.add(
         new org.apache.avro.Schema.Field(
@@ -269,7 +261,6 @@ public class AvroUtilsTest {
         .addField(Field.of("bytes", FieldType.BYTES))
         .addField(Field.of("decimal", FieldType.DECIMAL))
         .addField(Field.of("timestampMillis", FieldType.DATETIME))
-        .addField(Field.of("timestampMicros", FieldType.logicalType(SqlTypes.TIMESTAMP)))
         .addField(Field.of("row", FieldType.row(subSchema)))
         .addField(Field.of("array", FieldType.array(FieldType.row(subSchema))))
         .addField(Field.of("map", FieldType.map(FieldType.STRING, FieldType.row(subSchema))))
@@ -279,9 +270,6 @@ public class AvroUtilsTest {
   private static final byte[] BYTE_ARRAY = new byte[] {1, 2, 3, 4};
   private static final DateTime DATE_TIME =
       new DateTime().withDate(1979, 3, 14).withTime(1, 2, 3, 4).withZone(DateTimeZone.UTC);
-  private static final java.time.Instant MICROS_INSTANT =
-      java.time.Instant.ofEpochMilli(DATE_TIME.getMillis())
-          .plusNanos(TimeUnit.MICROSECONDS.toNanos(123));
   private static final BigDecimal BIG_DECIMAL = new BigDecimal(3600);
 
   private Row getBeamRow() {
@@ -296,7 +284,6 @@ public class AvroUtilsTest {
         .addValue(BYTE_ARRAY)
         .addValue(BIG_DECIMAL)
         .addValue(DATE_TIME)
-        .addValue(MICROS_INSTANT)
         .addValue(subRow)
         .addValue(ImmutableList.of(subRow, subRow))
         .addValue(ImmutableMap.of("k1", subRow, "k2", subRow))
@@ -329,7 +316,6 @@ public class AvroUtilsTest {
         .set("bytes", ByteBuffer.wrap(BYTE_ARRAY))
         .set("decimal", encodedDecimal)
         .set("timestampMillis", DATE_TIME.getMillis())
-        .set("timestampMicros", AvroUtils.getMicrosFromJavaInstant(MICROS_INSTANT))
         .set("row", getSubGenericRecord("row"))
         .set("array", ImmutableList.of(getSubGenericRecord("array"), getSubGenericRecord("array")))
         .set(
@@ -728,25 +714,6 @@ public class AvroUtilsTest {
     assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema));
   }
 
-  @Test
-  public void testSqlTypesToGenericRecord() {
-    // SqlTypes to LogicalTypes.date conversion are one direction
-    java.time.LocalDate localDate = java.time.LocalDate.of(1979, 3, 14);
-
-    Schema beamSchema =
-        Schema.builder()
-            .addField(Field.of("local_date", FieldType.logicalType(SqlTypes.DATE)))
-            .build();
-
-    Row rowData = Row.withSchema(beamSchema).addValue(localDate).build();
-
-    org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
-    GenericRecord expectedRecord =
-        new GenericRecordBuilder(avroSchema).set("local_date", localDate.toEpochDay()).build();
-
-    assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema));
-  }
-
   @Test
   public void testBeamRowToGenericRecord() {
     GenericRecord genericRecord = AvroUtils.toGenericRecord(getBeamRow(), null);