You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2022/12/02 13:42:44 UTC

[beam] branch revert-23969-sqlttypes created (now e48b329e7a7)

This is an automated email from the ASF dual-hosted git repository.

yhu pushed a change to branch revert-23969-sqlttypes
in repository https://gitbox.apache.org/repos/asf/beam.git


      at e48b329e7a7 Revert "Support SqlTypes Date and Timestamp (MicrosInstant) in AvroUtils (#23969)"

This branch includes the following new commits:

     new e48b329e7a7 Revert "Support SqlTypes Date and Timestamp (MicrosInstant) in AvroUtils (#23969)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: Revert "Support SqlTypes Date and Timestamp (MicrosInstant) in AvroUtils (#23969)"

Posted by yh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch revert-23969-sqlttypes
in repository https://gitbox.apache.org/repos/asf/beam.git

commit e48b329e7a7e868ed38f7e8e0973957d81ed8678
Author: Yi Hu <hu...@gmail.com>
AuthorDate: Fri Dec 2 08:42:34 2022 -0500

    Revert "Support SqlTypes Date and Timestamp (MicrosInstant) in AvroUtils (#23969)"
    
    This reverts commit d5d76b974592d45de368ab641647ca5cc4ec12ec.
---
 .../apache/beam/sdk/schemas/utils/AvroUtils.java   | 91 ++--------------------
 .../beam/sdk/schemas/utils/AvroUtilsTest.java      | 33 --------
 2 files changed, 6 insertions(+), 118 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
index e61dbe505e2..8d01ed0406a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
@@ -34,7 +34,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
@@ -77,7 +76,6 @@ import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
 import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
 import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
 import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
-import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
 import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
 import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
@@ -89,7 +87,6 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.CaseFormat;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
@@ -101,42 +98,7 @@ import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.ReadableInstant;
 
-/**
- * Utils to convert AVRO records to Beam rows. Imposes a mapping between common avro types and Beam
- * portable schemas (https://s.apache.org/beam-schemas):
- *
- * <pre>
- *   Avro                Beam Field Type
- *   INT         <-----> INT32
- *   LONG        <-----> INT64
- *   FLOAT       <-----> FLOAT
- *   DOUBLE      <-----> DOUBLE
- *   BOOLEAN     <-----> BOOLEAN
- *   STRING      <-----> STRING
- *   BYTES       <-----> BYTES
- *               <------ LogicalType(urn="beam:logical_type:var_bytes:v1")
- *   FIXED       <-----> LogicalType(urn="beam:logical_type:fixed_bytes:v1")
- *   ARRAY       <-----> ARRAY
- *   ENUM        <-----> LogicalType(EnumerationType)
- *   MAP         <-----> MAP
- *   RECORD      <-----> ROW
- *   UNION       <-----> LogicalType(OneOfType)
- *   LogicalTypes.Date              <-----> LogicalType(DATE)
- *                                  <------ LogicalType(urn="beam:logical_type:date:v1")
- *   LogicalTypes.TimestampMillis   <-----> DATETIME
- *   LogicalTypes.TimestampMicros   <-----> LogicalType(urn="beam:logical_type:micros_instant:v1")
- *   LogicalTypes.Decimal           <-----> DECIMAL
- * </pre>
- *
- * For SQL CHAR/VARCHAR types, an Avro schema
- *
- * <pre>
- *   LogicalType({"type":"string","logicalType":"char","maxLength":MAX_LENGTH}) or
- *   LogicalType({"type":"string","logicalType":"varchar","maxLength":MAX_LENGTH})
- * </pre>
- *
- * is used.
- */
+/** Utils to convert AVRO records to Beam rows. */
 @Experimental(Kind.SCHEMAS)
 @SuppressWarnings({
   "nullness", // TODO(https://github.com/apache/beam/issues/20497)
@@ -810,11 +772,9 @@ public class AvroUtils {
       if (logicalType instanceof LogicalTypes.Decimal) {
         fieldType = FieldType.DECIMAL;
       } else if (logicalType instanceof LogicalTypes.TimestampMillis) {
-        // TODO(https://github.com/apache/beam/issues/19215) DATETIME primitive will be removed when
-        // fully migrates to java.time lib from joda-time
+        // TODO: There is a desire to move Beam schema DATETIME to a micros representation. When
+        // this is done, this logical type needs to be changed.
         fieldType = FieldType.DATETIME;
-      } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
-        fieldType = FieldType.logicalType(SqlTypes.TIMESTAMP);
       } else if (logicalType instanceof LogicalTypes.Date) {
         fieldType = FieldType.DATETIME;
       }
@@ -927,8 +887,8 @@ public class AvroUtils {
         break;
 
       case DATETIME:
-        // TODO(https://github.com/apache/beam/issues/19215) DATETIME primitive will be removed when
-        // fully migrates to java.time lib from joda-time
+        // TODO: There is a desire to move Beam schema DATETIME to a micros representation. When
+        // this is done, this logical type needs to be changed.
         baseType =
             LogicalTypes.timestampMillis().addToSchema(org.apache.avro.Schema.create(Type.LONG));
         break;
@@ -973,13 +933,10 @@ public class AvroUtils {
                   oneOfType.getOneOfSchema().getFields().stream()
                       .map(x -> getFieldSchema(x.getType(), x.getName(), namespace))
                       .collect(Collectors.toList()));
-        } else if ("DATE".equals(identifier) || SqlTypes.DATE.getIdentifier().equals(identifier)) {
+        } else if ("DATE".equals(identifier)) {
           baseType = LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT));
         } else if ("TIME".equals(identifier)) {
           baseType = LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT));
-        } else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
-          baseType =
-              LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG));
         } else {
           throw new RuntimeException(
               "Unhandled logical type " + fieldType.getLogicalType().getIdentifier());
@@ -1104,17 +1061,9 @@ public class AvroUtils {
                 oneOfValue.getValue());
           }
         } else if ("DATE".equals(identifier)) {
-          // "Date" is backed by joda.time.Instant
           return Days.daysBetween(Instant.EPOCH, (Instant) value).getDays();
-        } else if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
-          // portable SqlTypes.DATE is backed by java.time.LocalDate
-          return ((java.time.LocalDate) value).toEpochDay();
         } else if ("TIME".equals(identifier)) {
-          // "TIME" is backed by joda.time.Instant
           return (int) ((Instant) value).getMillis();
-        } else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
-          // portable SqlTypes.TIMESTAMP is backed by java.time.Instant
-          return getMicrosFromJavaInstant((java.time.Instant) value);
         } else {
           throw new RuntimeException("Unhandled logical type " + identifier);
         }
@@ -1188,19 +1137,10 @@ public class AvroUtils {
         } else {
           return convertDateTimeStrict((Long) value, fieldType);
         }
-      } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
-        if (value instanceof java.time.Instant) {
-          return convertMicroMillisStrict(
-              getMicrosFromJavaInstant((java.time.Instant) value), fieldType);
-        } else {
-          return convertMicroMillisStrict((Long) value, fieldType);
-        }
       } else if (logicalType instanceof LogicalTypes.Date) {
         if (value instanceof ReadableInstant) {
           int epochDays = Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays();
           return convertDateStrict(epochDays, fieldType);
-        } else if (value instanceof java.time.LocalDate) {
-          return convertDateStrict((int) ((java.time.LocalDate) value).toEpochDay(), fieldType);
         } else {
           return convertDateStrict((Integer) value, fieldType);
         }
@@ -1258,14 +1198,6 @@ public class AvroUtils {
     }
   }
 
-  /** Helper method to get epoch micros required by Avro TimeStampMicros logical type. */
-  @SuppressWarnings("JavaInstantGetSecondsGetNano")
-  @VisibleForTesting
-  static long getMicrosFromJavaInstant(java.time.Instant value) {
-    return TimeUnit.SECONDS.toMicros(value.getEpochSecond())
-        + TimeUnit.NANOSECONDS.toMicros(value.getNano());
-  }
-
   private static Object convertRecordStrict(GenericRecord record, Schema.FieldType fieldType) {
     checkTypeName(fieldType.getTypeName(), Schema.TypeName.ROW, "record");
     return toBeamRowStrict(record, fieldType.getRowSchema());
@@ -1315,17 +1247,6 @@ public class AvroUtils {
     return new Instant(value);
   }
 
-  private static Object convertMicroMillisStrict(Long value, Schema.FieldType fieldType) {
-    checkTypeName(
-        fieldType.getTypeName(), TypeName.LOGICAL_TYPE, SqlTypes.TIMESTAMP.getIdentifier());
-    checkArgument(
-        fieldType.getLogicalType().getIdentifier().equals(SqlTypes.TIMESTAMP.getIdentifier()));
-
-    return java.time.Instant.ofEpochSecond(
-        TimeUnit.MICROSECONDS.toSeconds(value),
-        TimeUnit.MICROSECONDS.toNanos(Math.floorMod(value, TimeUnit.SECONDS.toMicros(1))));
-  }
-
   private static Object convertFloatStrict(Float value, Schema.FieldType fieldType) {
     checkTypeName(fieldType.getTypeName(), Schema.TypeName.FLOAT, "float");
     return value;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
index 0605aa6fd1c..3087959c1e0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
@@ -29,7 +29,6 @@ import java.nio.ByteBuffer;
 import java.sql.JDBCType;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import org.apache.avro.Conversions;
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
@@ -47,7 +46,6 @@ import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
 import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
-import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.schemas.utils.AvroGenerators.RecordSchemaGenerator;
 import org.apache.beam.sdk.schemas.utils.AvroUtils.TypeWithNullability;
 import org.apache.beam.sdk.testing.CoderProperties;
@@ -231,12 +229,6 @@ public class AvroUtilsTest {
             LogicalTypes.timestampMillis().addToSchema(org.apache.avro.Schema.create(Type.LONG)),
             "",
             (Object) null));
-    fields.add(
-        new org.apache.avro.Schema.Field(
-            "timestampMicros",
-            LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG)),
-            "",
-            (Object) null));
     fields.add(new org.apache.avro.Schema.Field("row", getAvroSubSchema("row"), "", (Object) null));
     fields.add(
         new org.apache.avro.Schema.Field(
@@ -269,7 +261,6 @@ public class AvroUtilsTest {
         .addField(Field.of("bytes", FieldType.BYTES))
         .addField(Field.of("decimal", FieldType.DECIMAL))
         .addField(Field.of("timestampMillis", FieldType.DATETIME))
-        .addField(Field.of("timestampMicros", FieldType.logicalType(SqlTypes.TIMESTAMP)))
         .addField(Field.of("row", FieldType.row(subSchema)))
         .addField(Field.of("array", FieldType.array(FieldType.row(subSchema))))
         .addField(Field.of("map", FieldType.map(FieldType.STRING, FieldType.row(subSchema))))
@@ -279,9 +270,6 @@ public class AvroUtilsTest {
   private static final byte[] BYTE_ARRAY = new byte[] {1, 2, 3, 4};
   private static final DateTime DATE_TIME =
       new DateTime().withDate(1979, 3, 14).withTime(1, 2, 3, 4).withZone(DateTimeZone.UTC);
-  private static final java.time.Instant MICROS_INSTANT =
-      java.time.Instant.ofEpochMilli(DATE_TIME.getMillis())
-          .plusNanos(TimeUnit.MICROSECONDS.toNanos(123));
   private static final BigDecimal BIG_DECIMAL = new BigDecimal(3600);
 
   private Row getBeamRow() {
@@ -296,7 +284,6 @@ public class AvroUtilsTest {
         .addValue(BYTE_ARRAY)
         .addValue(BIG_DECIMAL)
         .addValue(DATE_TIME)
-        .addValue(MICROS_INSTANT)
         .addValue(subRow)
         .addValue(ImmutableList.of(subRow, subRow))
         .addValue(ImmutableMap.of("k1", subRow, "k2", subRow))
@@ -329,7 +316,6 @@ public class AvroUtilsTest {
         .set("bytes", ByteBuffer.wrap(BYTE_ARRAY))
         .set("decimal", encodedDecimal)
         .set("timestampMillis", DATE_TIME.getMillis())
-        .set("timestampMicros", AvroUtils.getMicrosFromJavaInstant(MICROS_INSTANT))
         .set("row", getSubGenericRecord("row"))
         .set("array", ImmutableList.of(getSubGenericRecord("array"), getSubGenericRecord("array")))
         .set(
@@ -728,25 +714,6 @@ public class AvroUtilsTest {
     assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema));
   }
 
-  @Test
-  public void testSqlTypesToGenericRecord() {
-    // SqlTypes to LogicalTypes.date conversion are one direction
-    java.time.LocalDate localDate = java.time.LocalDate.of(1979, 3, 14);
-
-    Schema beamSchema =
-        Schema.builder()
-            .addField(Field.of("local_date", FieldType.logicalType(SqlTypes.DATE)))
-            .build();
-
-    Row rowData = Row.withSchema(beamSchema).addValue(localDate).build();
-
-    org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
-    GenericRecord expectedRecord =
-        new GenericRecordBuilder(avroSchema).set("local_date", localDate.toEpochDay()).build();
-
-    assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema));
-  }
-
   @Test
   public void testBeamRowToGenericRecord() {
     GenericRecord genericRecord = AvroUtils.toGenericRecord(getBeamRow(), null);