You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2020/03/09 04:30:36 UTC

[GitHub] [beam] reuvenlax opened a new pull request #11074: Store logical type values in Row instead of base values

reuvenlax opened a new pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074
 
 
   @kanterov @alexvanboxel I believe this PR will fix the issues you've both had with logical types.
   
   After some thought, I think we're better off storing the logical type value in the Row object. It potentially makes SchemaCoder a tiny bit slower, but unlikely this is noticeable. One bonus: it means that storage for OneOf types is far more memory efficient, as we no longer need to store the entire row.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r389644249
 
 

 ##########
 File path: sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java
 ##########
 @@ -620,6 +531,88 @@ public void testGloballyWithSchemaAggregateFnNestedFields() {
     pipeline.run();
   }
 
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  abstract static class BasicEnum {
+    enum Test {
+      ZERO,
+      ONE,
+      TWO
+    };
+
+    abstract String getKey();
+
+    abstract Test getEnumeration();
+
+    static BasicEnum of(String key, Test value) {
+      return new AutoValue_GroupTest_BasicEnum(key, value);
+    }
+  }
+
+  static final EnumerationType BASIC_ENUM_ENUMERATION =
+      EnumerationType.create("ZERO", "ONE", "TWO");
+  static final Schema BASIC_ENUM_SCHEMA =
+      Schema.builder()
+          .addStringField("key")
+          .addLogicalTypeField("enumeration", BASIC_ENUM_ENUMERATION)
+          .build();
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testAggregateBaseValuesGlobally() {
+    Collection<BasicEnum> elements =
+        Lists.newArrayList(
+            BasicEnum.of("a", BasicEnum.Test.ONE), BasicEnum.of("a", BasicEnum.Test.TWO));
+
+    PCollection<Row> aggregate =
+        pipeline
+            .apply(Create.of(elements))
+            .apply(
+                Group.<BasicEnum>globally()
+                    .aggregateFieldBaseValue("enumeration", Sum.ofIntegers(), "enum_sum"));
 
 Review comment:
   This looks rather unexpected that Combiner accepts `int` as an argument instead of `EnumerationType.Value`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r389648327
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
 ##########
 @@ -202,10 +202,10 @@ public void populateDisplayData(DisplayData.Builder builder) {
   }
 
   /** A {@link CombineFn} that combines into a {@link List} of up to limit elements. */
-  private static class SampleAnyCombineFn<T> extends CombineFn<T, List<T>, Iterable<T>> {
+  public static class SampleAnyCombineFn<T> extends CombineFn<T, List<T>, Iterable<T>> {
 
 Review comment:
   nit: doesn't look necessary, we can probably replicate this combiner in tests, instead of extending public API

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on issue #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-602705849
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] alexvanboxel commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
alexvanboxel commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r396074521
 
 

 ##########
 File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
 ##########
 @@ -396,8 +396,8 @@ public void testLogicalTypes() {
 
     PCollection<Row> outputRow =
         pipeline
-            .apply(Create.of(row))
-            .setRowSchema(outputRowSchema)
+            .apply(Create.of(row).withRowSchema(inputRowSchema))
+            //  .setRowSchema(outputRowSchema)
 
 Review comment:
   Remove comment

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r389652202
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
 ##########
 @@ -99,37 +97,14 @@
 @Experimental(Kind.SCHEMAS)
 public abstract class RowCoderGenerator {
   private static final ByteBuddy BYTE_BUDDY = new ByteBuddy();
-  private static final ForLoadedType CODER_TYPE = new ForLoadedType(Coder.class);
-  private static final ForLoadedType LIST_CODER_TYPE = new ForLoadedType(ListCoder.class);
-  private static final ForLoadedType ITERABLE_CODER_TYPE = new ForLoadedType(IterableCoder.class);
-  private static final ForLoadedType MAP_CODER_TYPE = new ForLoadedType(MapCoder.class);
   private static final BitSetCoder NULL_LIST_CODER = BitSetCoder.of();
   private static final VarIntCoder VAR_INT_CODER = VarIntCoder.of();
-  private static final ForLoadedType NULLABLE_CODER = new ForLoadedType(NullableCoder.class);
 
   private static final String CODERS_FIELD_NAME = "FIELD_CODERS";
 
-  // A map of primitive types -> StackManipulations to create their coders.
-  private static final Map<TypeName, StackManipulation> CODER_MAP;
-
   // Cache for Coder class that are already generated.
   private static Map<UUID, Coder<Row>> generatedCoders = Maps.newConcurrentMap();
 
 Review comment:
   nit: not directly relevant to this PR, but I guess this field should be final, and name should follow convention

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r396586669
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoderHelpers.java
 ##########
 @@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianShortCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.FloatCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.ReadableInstant;
+
+class SchemaCoderHelpers {
+  // This contains a map of primitive types to their coders.
+  private static final Map<TypeName, Coder> CODER_MAP =
+      ImmutableMap.<TypeName, Coder>builder()
+          .put(TypeName.BYTE, ByteCoder.of())
+          .put(TypeName.BYTES, ByteArrayCoder.of())
+          .put(TypeName.INT16, BigEndianShortCoder.of())
+          .put(TypeName.INT32, VarIntCoder.of())
+          .put(TypeName.INT64, VarLongCoder.of())
+          .put(TypeName.DECIMAL, BigDecimalCoder.of())
+          .put(TypeName.FLOAT, FloatCoder.of())
+          .put(TypeName.DOUBLE, DoubleCoder.of())
+          .put(TypeName.STRING, StringUtf8Coder.of())
+          .put(TypeName.DATETIME, InstantCoder.of())
+          .put(TypeName.BOOLEAN, BooleanCoder.of())
+          .build();
+
+  private static class LogicalTypeCoder<InputT, BaseT> extends Coder<InputT> {
+    private final LogicalType<InputT, BaseT> logicalType;
+    private final Coder<BaseT> baseTypeCoder;
+    private final boolean isDateTime;
+
+    LogicalTypeCoder(LogicalType<InputT, BaseT> logicalType, Coder baseTypeCoder) {
+      this.logicalType = logicalType;
+      this.baseTypeCoder = baseTypeCoder;
+      this.isDateTime = logicalType.getBaseType().equals(FieldType.DATETIME);
+    }
+
+    @Override
+    public void encode(InputT value, OutputStream outStream) throws CoderException, IOException {
+      BaseT baseType = logicalType.toBaseType(value);
+      if (isDateTime) {
+        baseType = (BaseT) ((ReadableInstant) baseType).toInstant();
 
 Review comment:
   It's to maintain the current invariant that any ReadableInstant can be passed in, while the current InstantCoder requires an Instant. This used to be enforced in the Row builder because we would call toBaseType there.
   
   We need to redo DateTime types I think (really we have a timestamp type, not a datetime type), but until then I wanted to maintain the existing behavior.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r390766143
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
 ##########
 @@ -202,10 +202,10 @@ public void populateDisplayData(DisplayData.Builder builder) {
   }
 
   /** A {@link CombineFn} that combines into a {@link List} of up to limit elements. */
-  private static class SampleAnyCombineFn<T> extends CombineFn<T, List<T>, Iterable<T>> {
+  public static class SampleAnyCombineFn<T> extends CombineFn<T, List<T>, Iterable<T>> {
 
 Review comment:
   Sample.anyCombineFn() exists, so I reverted this change

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r389647574
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
 ##########
 @@ -362,7 +371,24 @@ public Boolean getBoolean(int idx) {
   @Nullable
   public <T> T getLogicalTypeValue(int idx, Class<T> clazz) {
     LogicalType logicalType = checkNotNull(getSchema().getField(idx).getType().getLogicalType());
-    return (T) logicalType.toInputType(getValue(idx));
+    return (T) getValue(idx);
+  }
+
+  /**
+   * Returns the Logical Type base type for this field. If there are multiple nested logical types,
+   * they are all resolved to the first non-logical type. {@link IllegalStateException} is thrown if
+   * schema doesn't match.
+   */
+  @Nullable
+  public <T> T getLogicalTypeBaseValue(int idx, Class<T> clazz) {
+    Object value = getValue(idx);
+    FieldType fieldType = getSchema().getField(idx).getType();
+    checkArgument(fieldType.getTypeName().isLogicalType());
+    while (fieldType.getTypeName().isLogicalType()) {
+      value = fieldType.getLogicalType().toBaseType(value);
 
 Review comment:
   Should `toBaseType` handle null values for nullable fields or not? Can we assume that `toBaseType(null) == null`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] alexvanboxel commented on issue #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
alexvanboxel commented on issue #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-601111384
 
 
   > @alexvanboxel this will conflict badly with your PR I think (as you moved code into a new file, which makes merges tricky) so we need to be quite careful with how we merge these PRs.
   
   No more comment, I like the consistency. We're using some logical types into our pipelines, so I will make a custom build before the release to see if all goes well.
   
   I've merged my PR onto master as soon as I saw the LGTM, so I think the easiest thing is to rebase this branch onto master and handle the schema conflicts. I'll review as soon as this is done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] alexvanboxel commented on issue #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
alexvanboxel commented on issue #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-596948465
 
 
   > We could work around this by always converting to the base type inside of equals and hashCode, though that might be a bit expensive. However value types without a proper equals are generally discouraged in Java, so I think we can simply document that equals is required.
   
   Documenting the requirement seems reasonable. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] alexvanboxel commented on issue #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
alexvanboxel commented on issue #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-596367911
 
 
   I can review the PR only this evening, but the first question that pops into my mind is:
   What traits does the logical type need to have to play nice with Beam:
   - Does it need to be comparable?
   - Does the equal need to have proper behaviour?
   These things we have control over when storing and working with the base type.
   
    

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on issue #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-602824816
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r390766044
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
 ##########
 @@ -362,7 +371,24 @@ public Boolean getBoolean(int idx) {
   @Nullable
   public <T> T getLogicalTypeValue(int idx, Class<T> clazz) {
     LogicalType logicalType = checkNotNull(getSchema().getField(idx).getType().getLogicalType());
-    return (T) logicalType.toInputType(getValue(idx));
+    return (T) getValue(idx);
+  }
+
+  /**
+   * Returns the Logical Type base type for this field. If there are multiple nested logical types,
+   * they are all resolved to the first non-logical type. {@link IllegalStateException} is thrown if
+   * schema doesn't match.
+   */
+  @Nullable
+  public <T> T getLogicalTypeBaseValue(int idx, Class<T> clazz) {
+    Object value = getValue(idx);
+    FieldType fieldType = getSchema().getField(idx).getType();
+    checkArgument(fieldType.getTypeName().isLogicalType());
+    while (fieldType.getTypeName().isLogicalType()) {
+      value = fieldType.getLogicalType().toBaseType(value);
 
 Review comment:
   good catch, we should.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on issue #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-602419503
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on issue #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-602935452
 
 
   After 8 runs, the only Java Precommit failures have been random flakes (e.g. in Flink tests). 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r390765162
 
 

 ##########
 File path: sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java
 ##########
 @@ -620,6 +531,88 @@ public void testGloballyWithSchemaAggregateFnNestedFields() {
     pipeline.run();
   }
 
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  abstract static class BasicEnum {
+    enum Test {
+      ZERO,
+      ONE,
+      TWO
+    };
+
+    abstract String getKey();
+
+    abstract Test getEnumeration();
+
+    static BasicEnum of(String key, Test value) {
+      return new AutoValue_GroupTest_BasicEnum(key, value);
+    }
+  }
+
+  static final EnumerationType BASIC_ENUM_ENUMERATION =
+      EnumerationType.create("ZERO", "ONE", "TWO");
+  static final Schema BASIC_ENUM_SCHEMA =
+      Schema.builder()
+          .addStringField("key")
+          .addLogicalTypeField("enumeration", BASIC_ENUM_ENUMERATION)
+          .build();
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testAggregateBaseValuesGlobally() {
+    Collection<BasicEnum> elements =
+        Lists.newArrayList(
+            BasicEnum.of("a", BasicEnum.Test.ONE), BasicEnum.of("a", BasicEnum.Test.TWO));
+
+    PCollection<Row> aggregate =
+        pipeline
+            .apply(Create.of(elements))
+            .apply(
+                Group.<BasicEnum>globally()
+                    .aggregateFieldBaseValue("enumeration", Sum.ofIntegers(), "enum_sum"));
 
 Review comment:
   it's because aggregateFieldBaseValue is called instead of aggregateField. Would a different name make it more clear?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r391253578
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoderHelpers.java
 ##########
 @@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianShortCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.FloatCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+public class SchemaCoderHelpers {
+  // This contains a map of primitive types to their coders.
+  public static final Map<TypeName, Coder> CODER_MAP =
+      ImmutableMap.<TypeName, Coder>builder()
+          .put(TypeName.BYTE, ByteCoder.of())
+          .put(TypeName.BYTES, ByteArrayCoder.of())
+          .put(TypeName.INT16, BigEndianShortCoder.of())
+          .put(TypeName.INT32, VarIntCoder.of())
+          .put(TypeName.INT64, VarLongCoder.of())
+          .put(TypeName.DECIMAL, BigDecimalCoder.of())
+          .put(TypeName.FLOAT, FloatCoder.of())
+          .put(TypeName.DOUBLE, DoubleCoder.of())
+          .put(TypeName.STRING, StringUtf8Coder.of())
+          .put(TypeName.DATETIME, InstantCoder.of())
+          .put(TypeName.BOOLEAN, BooleanCoder.of())
+          .build();
+
+  private static class LogicalTypeCoder extends Coder {
 
 Review comment:
   Made these changes.
   
   We should probably update SchemaCoder as well to do this properly, but that can be in a different PR IMO.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on issue #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-602864720
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on issue #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-596328754
 
 
   R: @kanterov 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on issue #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-597724269
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax merged pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax merged pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r390766194
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
 ##########
 @@ -119,27 +86,6 @@ protected SchemaCoder(
     return RowCoder.of(schema);
   }
 
-  /** Returns the coder used for a given primitive type. */
-  public static <T> Coder<T> coderForFieldType(FieldType fieldType) {
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on issue #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-602887176
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on issue #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-602786238
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r391093843
 
 

 ##########
 File path: sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java
 ##########
 @@ -620,6 +531,88 @@ public void testGloballyWithSchemaAggregateFnNestedFields() {
     pipeline.run();
   }
 
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  abstract static class BasicEnum {
+    enum Test {
+      ZERO,
+      ONE,
+      TWO
+    };
+
+    abstract String getKey();
+
+    abstract Test getEnumeration();
+
+    static BasicEnum of(String key, Test value) {
+      return new AutoValue_GroupTest_BasicEnum(key, value);
+    }
+  }
+
+  static final EnumerationType BASIC_ENUM_ENUMERATION =
+      EnumerationType.create("ZERO", "ONE", "TWO");
+  static final Schema BASIC_ENUM_SCHEMA =
+      Schema.builder()
+          .addStringField("key")
+          .addLogicalTypeField("enumeration", BASIC_ENUM_ENUMERATION)
+          .build();
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testAggregateBaseValuesGlobally() {
+    Collection<BasicEnum> elements =
+        Lists.newArrayList(
+            BasicEnum.of("a", BasicEnum.Test.ONE), BasicEnum.of("a", BasicEnum.Test.TWO));
+
+    PCollection<Row> aggregate =
+        pipeline
+            .apply(Create.of(elements))
+            .apply(
+                Group.<BasicEnum>globally()
+                    .aggregateFieldBaseValue("enumeration", Sum.ofIntegers(), "enum_sum"));
 
 Review comment:
   I see I didn't get it in the beginning, but now it all makes sense.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r396588436
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoderHelpers.java
 ##########
 @@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianShortCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.FloatCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.ReadableInstant;
+
+class SchemaCoderHelpers {
+  // This contains a map of primitive types to their coders.
+  private static final Map<TypeName, Coder> CODER_MAP =
+      ImmutableMap.<TypeName, Coder>builder()
+          .put(TypeName.BYTE, ByteCoder.of())
+          .put(TypeName.BYTES, ByteArrayCoder.of())
+          .put(TypeName.INT16, BigEndianShortCoder.of())
+          .put(TypeName.INT32, VarIntCoder.of())
+          .put(TypeName.INT64, VarLongCoder.of())
+          .put(TypeName.DECIMAL, BigDecimalCoder.of())
+          .put(TypeName.FLOAT, FloatCoder.of())
+          .put(TypeName.DOUBLE, DoubleCoder.of())
+          .put(TypeName.STRING, StringUtf8Coder.of())
+          .put(TypeName.DATETIME, InstantCoder.of())
+          .put(TypeName.BOOLEAN, BooleanCoder.of())
+          .build();
+
+  private static class LogicalTypeCoder<InputT, BaseT> extends Coder<InputT> {
+    private final LogicalType<InputT, BaseT> logicalType;
+    private final Coder<BaseT> baseTypeCoder;
+    private final boolean isDateTime;
+
+    LogicalTypeCoder(LogicalType<InputT, BaseT> logicalType, Coder baseTypeCoder) {
+      this.logicalType = logicalType;
+      this.baseTypeCoder = baseTypeCoder;
+      this.isDateTime = logicalType.getBaseType().equals(FieldType.DATETIME);
+    }
+
+    @Override
+    public void encode(InputT value, OutputStream outStream) throws CoderException, IOException {
+      BaseT baseType = logicalType.toBaseType(value);
+      if (isDateTime) {
+        baseType = (BaseT) ((ReadableInstant) baseType).toInstant();
 
 Review comment:
   Thanks. Makes sense. Agree that DateTime types need to be redone.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on issue #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-600982415
 
 
   @alexvanboxel @kanterov any more comments?
   
   @alexvanboxel this will conflict badly with your PR I think (as you moved code into a new file, which makes merges tricky) so we need to be quite careful with how we merge these PRs.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on issue #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-602400974
 
 
   Run SQL Postcommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r389645005
 
 

 ##########
 File path: sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java
 ##########
 @@ -620,6 +531,88 @@ public void testGloballyWithSchemaAggregateFnNestedFields() {
     pipeline.run();
   }
 
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  abstract static class BasicEnum {
+    enum Test {
+      ZERO,
+      ONE,
+      TWO
+    };
+
+    abstract String getKey();
+
+    abstract Test getEnumeration();
+
+    static BasicEnum of(String key, Test value) {
+      return new AutoValue_GroupTest_BasicEnum(key, value);
+    }
+  }
+
+  static final EnumerationType BASIC_ENUM_ENUMERATION =
+      EnumerationType.create("ZERO", "ONE", "TWO");
+  static final Schema BASIC_ENUM_SCHEMA =
+      Schema.builder()
+          .addStringField("key")
+          .addLogicalTypeField("enumeration", BASIC_ENUM_ENUMERATION)
+          .build();
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testAggregateBaseValuesGlobally() {
+    Collection<BasicEnum> elements =
+        Lists.newArrayList(
+            BasicEnum.of("a", BasicEnum.Test.ONE), BasicEnum.of("a", BasicEnum.Test.TWO));
+
+    PCollection<Row> aggregate =
+        pipeline
+            .apply(Create.of(elements))
+            .apply(
+                Group.<BasicEnum>globally()
+                    .aggregateFieldBaseValue("enumeration", Sum.ofIntegers(), "enum_sum"));
+    Schema aggregateSchema = Schema.builder().addInt32Field("enum_sum").build();
+    Row expectedRow = Row.withSchema(aggregateSchema).addValues(3).build();
+    PAssert.that(aggregate).containsInAnyOrder(expectedRow);
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testAggregateLogicalValuesGlobally() {
+    Collection<BasicEnum> elements =
+        Lists.newArrayList(
+            BasicEnum.of("a", BasicEnum.Test.ONE), BasicEnum.of("a", BasicEnum.Test.TWO));
+
+    SampleAnyCombineFn<EnumerationType.Value> sampleAnyCombineFn = new SampleAnyCombineFn<>(100);
 
 Review comment:
   Does it only work because of type erasure, or is it actually aggregating `EnumerationType.Value` (not `Integer`)?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r389647148
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
 ##########
 @@ -362,7 +371,24 @@ public Boolean getBoolean(int idx) {
   @Nullable
   public <T> T getLogicalTypeValue(int idx, Class<T> clazz) {
     LogicalType logicalType = checkNotNull(getSchema().getField(idx).getType().getLogicalType());
-    return (T) logicalType.toInputType(getValue(idx));
+    return (T) getValue(idx);
+  }
+
+  /**
+   * Returns the Logical Type base type for this field. If there are multiple nested logical types,
+   * they are all resolved to the first non-logical type. {@link IllegalStateException} is thrown if
+   * schema doesn't match.
+   */
+  @Nullable
+  public <T> T getLogicalTypeBaseValue(int idx, Class<T> clazz) {
+    Object value = getValue(idx);
+    FieldType fieldType = getSchema().getField(idx).getType();
+    checkArgument(fieldType.getTypeName().isLogicalType());
 
 Review comment:
   nit: is it necessary to enforce? if yes, probably error message can be improved

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on issue #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-602418247
 
 
    Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on issue #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-596817023
 
 
   @alexvanboxel Yes, that is exactly true. This means that the logical type value must now implement equals and hashCode, whereas before it didn't (because we stored only the base value).
   
   We could work around this by always converting to the base type inside of equals and hashCode, though that might be a bit expensive. However value types without a proper equals are generally discouraged in Java, so I think we can simply document that equals is required.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r389653458
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
 ##########
 @@ -126,21 +127,25 @@ public T apply(Row row) {
               valueType,
               typeFactory);
     } else {
-      if (type.getTypeName().isLogicalType()
-          && OneOfType.IDENTIFIER.equals(type.getLogicalType().getIdentifier())) {
+      if (type.isLogicalType(OneOfType.IDENTIFIER)) {
 
 Review comment:
   Probably I need to look into the code more. Why is `OneOfType` is a special case, while there is a generic branch for other logical types? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r389641989
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoderHelpers.java
 ##########
 @@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianShortCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.FloatCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+public class SchemaCoderHelpers {
+  // This contains a map of primitive types to their coders.
+  public static final Map<TypeName, Coder> CODER_MAP =
+      ImmutableMap.<TypeName, Coder>builder()
+          .put(TypeName.BYTE, ByteCoder.of())
+          .put(TypeName.BYTES, ByteArrayCoder.of())
+          .put(TypeName.INT16, BigEndianShortCoder.of())
+          .put(TypeName.INT32, VarIntCoder.of())
+          .put(TypeName.INT64, VarLongCoder.of())
+          .put(TypeName.DECIMAL, BigDecimalCoder.of())
+          .put(TypeName.FLOAT, FloatCoder.of())
+          .put(TypeName.DOUBLE, DoubleCoder.of())
+          .put(TypeName.STRING, StringUtf8Coder.of())
+          .put(TypeName.DATETIME, InstantCoder.of())
+          .put(TypeName.BOOLEAN, BooleanCoder.of())
+          .build();
+
+  private static class LogicalTypeCoder extends Coder {
 
 Review comment:
   There is similar code in https://github.com/apache/beam/pull/11034/commits/cb3dce02ae5b359fcec68ea6cca1e1454eb64f66#diff-7ed3da1c0f656d0645423b9dab4da881R31
   
   There are a few differences that might make sense to incorporate:
   - don't use rawtypes for better typesafety
   - override `structuralValue`
   - explicitly override `constentWithEquals`
   - I wish we could do something better for `contentWithEquals`, but out of my head it would be unexpected to add any of this information to `LogicalType`
   - having `registerByteSizeObserver`, `isRegisterByteSizeObserverCheap` might make sense
   - `getCoderArguments` could return `baseCoder`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on issue #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-602743969
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on issue #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-602397588
 
 
   Run SQL Postcommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r390766211
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
 ##########
 @@ -99,37 +97,14 @@
 @Experimental(Kind.SCHEMAS)
 public abstract class RowCoderGenerator {
   private static final ByteBuddy BYTE_BUDDY = new ByteBuddy();
-  private static final ForLoadedType CODER_TYPE = new ForLoadedType(Coder.class);
-  private static final ForLoadedType LIST_CODER_TYPE = new ForLoadedType(ListCoder.class);
-  private static final ForLoadedType ITERABLE_CODER_TYPE = new ForLoadedType(IterableCoder.class);
-  private static final ForLoadedType MAP_CODER_TYPE = new ForLoadedType(MapCoder.class);
   private static final BitSetCoder NULL_LIST_CODER = BitSetCoder.of();
   private static final VarIntCoder VAR_INT_CODER = VarIntCoder.of();
-  private static final ForLoadedType NULLABLE_CODER = new ForLoadedType(NullableCoder.class);
 
   private static final String CODERS_FIELD_NAME = "FIELD_CODERS";
 
-  // A map of primitive types -> StackManipulations to create their coders.
-  private static final Map<TypeName, StackManipulation> CODER_MAP;
-
   // Cache for Coder class that are already generated.
   private static Map<UUID, Coder<Row>> generatedCoders = Maps.newConcurrentMap();
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r390765378
 
 

 ##########
 File path: sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java
 ##########
 @@ -620,6 +531,88 @@ public void testGloballyWithSchemaAggregateFnNestedFields() {
     pipeline.run();
   }
 
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  abstract static class BasicEnum {
+    enum Test {
+      ZERO,
+      ONE,
+      TWO
+    };
+
+    abstract String getKey();
+
+    abstract Test getEnumeration();
+
+    static BasicEnum of(String key, Test value) {
+      return new AutoValue_GroupTest_BasicEnum(key, value);
+    }
+  }
+
+  static final EnumerationType BASIC_ENUM_ENUMERATION =
+      EnumerationType.create("ZERO", "ONE", "TWO");
+  static final Schema BASIC_ENUM_SCHEMA =
+      Schema.builder()
+          .addStringField("key")
+          .addLogicalTypeField("enumeration", BASIC_ENUM_ENUMERATION)
+          .build();
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testAggregateBaseValuesGlobally() {
+    Collection<BasicEnum> elements =
+        Lists.newArrayList(
+            BasicEnum.of("a", BasicEnum.Test.ONE), BasicEnum.of("a", BasicEnum.Test.TWO));
+
+    PCollection<Row> aggregate =
+        pipeline
+            .apply(Create.of(elements))
+            .apply(
+                Group.<BasicEnum>globally()
+                    .aggregateFieldBaseValue("enumeration", Sum.ofIntegers(), "enum_sum"));
+    Schema aggregateSchema = Schema.builder().addInt32Field("enum_sum").build();
+    Row expectedRow = Row.withSchema(aggregateSchema).addValues(3).build();
+    PAssert.that(aggregate).containsInAnyOrder(expectedRow);
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testAggregateLogicalValuesGlobally() {
+    Collection<BasicEnum> elements =
+        Lists.newArrayList(
+            BasicEnum.of("a", BasicEnum.Test.ONE), BasicEnum.of("a", BasicEnum.Test.TWO));
+
+    SampleAnyCombineFn<EnumerationType.Value> sampleAnyCombineFn = new SampleAnyCombineFn<>(100);
 
 Review comment:
   No, it's actually aggregation EnumerationType.Value

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r396411943
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoderHelpers.java
 ##########
 @@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianShortCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.FloatCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.ReadableInstant;
+
+class SchemaCoderHelpers {
+  // This contains a map of primitive types to their coders.
+  private static final Map<TypeName, Coder> CODER_MAP =
+      ImmutableMap.<TypeName, Coder>builder()
+          .put(TypeName.BYTE, ByteCoder.of())
+          .put(TypeName.BYTES, ByteArrayCoder.of())
+          .put(TypeName.INT16, BigEndianShortCoder.of())
+          .put(TypeName.INT32, VarIntCoder.of())
+          .put(TypeName.INT64, VarLongCoder.of())
+          .put(TypeName.DECIMAL, BigDecimalCoder.of())
+          .put(TypeName.FLOAT, FloatCoder.of())
+          .put(TypeName.DOUBLE, DoubleCoder.of())
+          .put(TypeName.STRING, StringUtf8Coder.of())
+          .put(TypeName.DATETIME, InstantCoder.of())
+          .put(TypeName.BOOLEAN, BooleanCoder.of())
+          .build();
+
+  private static class LogicalTypeCoder<InputT, BaseT> extends Coder<InputT> {
+    private final LogicalType<InputT, BaseT> logicalType;
+    private final Coder<BaseT> baseTypeCoder;
+    private final boolean isDateTime;
+
+    LogicalTypeCoder(LogicalType<InputT, BaseT> logicalType, Coder baseTypeCoder) {
+      this.logicalType = logicalType;
+      this.baseTypeCoder = baseTypeCoder;
+      this.isDateTime = logicalType.getBaseType().equals(FieldType.DATETIME);
+    }
+
+    @Override
+    public void encode(InputT value, OutputStream outStream) throws CoderException, IOException {
+      BaseT baseType = logicalType.toBaseType(value);
+      if (isDateTime) {
+        baseType = (BaseT) ((ReadableInstant) baseType).toInstant();
 
 Review comment:
   Why is this check needed?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on issue #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-601452425
 
 
   @alexvanboxel I've rebased on top of your PR now. the only conflicts were in Row.java, which I've now handled.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r389645792
 
 

 ##########
 File path: sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java
 ##########
 @@ -592,41 +593,66 @@ public void testEnumFieldToRow() throws NoSuchSchemaException {
     SchemaRegistry registry = SchemaRegistry.createDefault();
     Schema schema = registry.getSchema(PojoWithEnum.class);
     SchemaTestUtils.assertSchemaEquivalent(POJO_WITH_ENUM_SCHEMA, schema);
-    EnumerationType enumerationType =
-        POJO_WITH_ENUM_SCHEMA.getField(0).getType().getLogicalType(EnumerationType.class);
+    EnumerationType enumerationType = ENUMERATION;
 
+    List<EnumerationType.Value> allColors =
+        Lists.newArrayList(
+            enumerationType.valueOf("RED"),
+            enumerationType.valueOf("GREEN"),
+            enumerationType.valueOf("BLUE"));
     Row redRow =
-        Row.withSchema(POJO_WITH_ENUM_SCHEMA).addValue(enumerationType.valueOf("RED")).build();
+        Row.withSchema(POJO_WITH_ENUM_SCHEMA)
+            .addValues(enumerationType.valueOf("RED"), allColors)
+            .build();
     Row greenRow =
-        Row.withSchema(POJO_WITH_ENUM_SCHEMA).addValue(enumerationType.valueOf("GREEN")).build();
+        Row.withSchema(POJO_WITH_ENUM_SCHEMA)
+            .addValues(enumerationType.valueOf("GREEN"), allColors)
+            .build();
     Row blueRow =
-        Row.withSchema(POJO_WITH_ENUM_SCHEMA).addValue(enumerationType.valueOf("BLUE")).build();
+        Row.withSchema(POJO_WITH_ENUM_SCHEMA)
+            .addValues(enumerationType.valueOf("BLUE"), allColors)
+            .build();
+
+    List<Color> allColorsJava = Lists.newArrayList(Color.RED, Color.GREEN, Color.BLUE);
 
     SerializableFunction<PojoWithEnum, Row> toRow = registry.getToRowFunction(PojoWithEnum.class);
-    assertEquals(redRow, toRow.apply(new PojoWithEnum(Color.RED)));
-    assertEquals(greenRow, toRow.apply(new PojoWithEnum(Color.GREEN)));
-    assertEquals(blueRow, toRow.apply(new PojoWithEnum(Color.BLUE)));
+    System.err.println("CONVERTED " + toRow.apply(new PojoWithEnum(Color.RED, allColorsJava)));
 
 Review comment:
   nit: we probably don't want `System.err.println` in tests

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r389642843
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoderHelpers.java
 ##########
 @@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianShortCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.FloatCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+public class SchemaCoderHelpers {
+  // This contains a map of primitive types to their coders.
+  public static final Map<TypeName, Coder> CODER_MAP =
+      ImmutableMap.<TypeName, Coder>builder()
+          .put(TypeName.BYTE, ByteCoder.of())
+          .put(TypeName.BYTES, ByteArrayCoder.of())
+          .put(TypeName.INT16, BigEndianShortCoder.of())
+          .put(TypeName.INT32, VarIntCoder.of())
+          .put(TypeName.INT64, VarLongCoder.of())
+          .put(TypeName.DECIMAL, BigDecimalCoder.of())
+          .put(TypeName.FLOAT, FloatCoder.of())
+          .put(TypeName.DOUBLE, DoubleCoder.of())
+          .put(TypeName.STRING, StringUtf8Coder.of())
+          .put(TypeName.DATETIME, InstantCoder.of())
+          .put(TypeName.BOOLEAN, BooleanCoder.of())
+          .build();
+
+  private static class LogicalTypeCoder extends Coder {
+    private final LogicalType logicalType;
+    private final Coder baseTypeCoder;
+
+    public LogicalTypeCoder(LogicalType logicalType, Coder baseTypeCoder) {
+      this.logicalType = logicalType;
+      this.baseTypeCoder = baseTypeCoder;
+    }
+
+    @Override
+    public void encode(Object value, OutputStream outStream) throws CoderException, IOException {
+      Object baseType = logicalType.toBaseType(value);
+      baseTypeCoder.encode(baseType, outStream);
+    }
+
+    @Override
+    public Object decode(InputStream inStream) throws CoderException, IOException {
+      Object baseType = baseTypeCoder.decode(inStream);
+      return logicalType.toInputType(baseType);
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      baseTypeCoder.verifyDeterministic();
+    }
+  }
+
+  /** Returns the coder used for a given primitive type. */
+  public static <T> Coder<T> coderForFieldType(FieldType fieldType) {
+    Coder<T> coder;
+    switch (fieldType.getTypeName()) {
+      case ROW:
+        coder = (Coder<T>) SchemaCoder.of(fieldType.getRowSchema());
+        break;
+      case ARRAY:
+        coder = (Coder<T>) ListCoder.of(coderForFieldType(fieldType.getCollectionElementType()));
+        break;
+      case ITERABLE:
+        coder =
+            (Coder<T>) IterableCoder.of(coderForFieldType(fieldType.getCollectionElementType()));
+        break;
+      case MAP:
+        coder =
+            (Coder<T>)
+                MapCoder.of(
+                    coderForFieldType(fieldType.getMapKeyType()),
+                    coderForFieldType(fieldType.getMapValueType()));
+        break;
+      case LOGICAL_TYPE:
+        coder =
+            new LogicalTypeCoder(
+                fieldType.getLogicalType(),
+                coderForFieldType(fieldType.getLogicalType().getBaseType()));
+        break;
+      default:
+        coder = (Coder<T>) CODER_MAP.get(fieldType.getTypeName());
 
 Review comment:
   What about replacing `default` with an explicit switch? There is error-prone rule that checks that switch cases are exhaustive, and it would help if we would add new `TypeName`. If not, it might make sense to throw a more elaborate exception if `coder is null`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r390764992
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoderHelpers.java
 ##########
 @@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianShortCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.FloatCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+public class SchemaCoderHelpers {
+  // This contains a map of primitive types to their coders.
+  public static final Map<TypeName, Coder> CODER_MAP =
+      ImmutableMap.<TypeName, Coder>builder()
+          .put(TypeName.BYTE, ByteCoder.of())
+          .put(TypeName.BYTES, ByteArrayCoder.of())
+          .put(TypeName.INT16, BigEndianShortCoder.of())
+          .put(TypeName.INT32, VarIntCoder.of())
+          .put(TypeName.INT64, VarLongCoder.of())
+          .put(TypeName.DECIMAL, BigDecimalCoder.of())
+          .put(TypeName.FLOAT, FloatCoder.of())
+          .put(TypeName.DOUBLE, DoubleCoder.of())
+          .put(TypeName.STRING, StringUtf8Coder.of())
+          .put(TypeName.DATETIME, InstantCoder.of())
+          .put(TypeName.BOOLEAN, BooleanCoder.of())
+          .build();
+
+  private static class LogicalTypeCoder extends Coder {
+    private final LogicalType logicalType;
+    private final Coder baseTypeCoder;
+
+    public LogicalTypeCoder(LogicalType logicalType, Coder baseTypeCoder) {
+      this.logicalType = logicalType;
+      this.baseTypeCoder = baseTypeCoder;
+    }
+
+    @Override
+    public void encode(Object value, OutputStream outStream) throws CoderException, IOException {
+      Object baseType = logicalType.toBaseType(value);
+      baseTypeCoder.encode(baseType, outStream);
+    }
+
+    @Override
+    public Object decode(InputStream inStream) throws CoderException, IOException {
+      Object baseType = baseTypeCoder.decode(inStream);
+      return logicalType.toInputType(baseType);
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      baseTypeCoder.verifyDeterministic();
+    }
+  }
+
+  /** Returns the coder used for a given primitive type. */
+  public static <T> Coder<T> coderForFieldType(FieldType fieldType) {
+    Coder<T> coder;
+    switch (fieldType.getTypeName()) {
+      case ROW:
+        coder = (Coder<T>) SchemaCoder.of(fieldType.getRowSchema());
+        break;
+      case ARRAY:
+        coder = (Coder<T>) ListCoder.of(coderForFieldType(fieldType.getCollectionElementType()));
+        break;
+      case ITERABLE:
+        coder =
+            (Coder<T>) IterableCoder.of(coderForFieldType(fieldType.getCollectionElementType()));
+        break;
+      case MAP:
+        coder =
+            (Coder<T>)
+                MapCoder.of(
+                    coderForFieldType(fieldType.getMapKeyType()),
+                    coderForFieldType(fieldType.getMapValueType()));
+        break;
+      case LOGICAL_TYPE:
+        coder =
+            new LogicalTypeCoder(
+                fieldType.getLogicalType(),
+                coderForFieldType(fieldType.getLogicalType().getBaseType()));
+        break;
+      default:
+        coder = (Coder<T>) CODER_MAP.get(fieldType.getTypeName());
 
 Review comment:
   Added  a null check.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r389650187
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
 ##########
 @@ -119,27 +86,6 @@ protected SchemaCoder(
     return RowCoder.of(schema);
   }
 
-  /** Returns the coder used for a given primitive type. */
-  public static <T> Coder<T> coderForFieldType(FieldType fieldType) {
 
 Review comment:
   I find this was useful to have, and it was public. Instead, I would suggest `SchemaCoderHelpers` to be package-private, and this method to delegate to `SchemaCoderHelpers`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
kanterov commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r391094901
 
 

 ##########
 File path: sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java
 ##########
 @@ -620,6 +531,88 @@ public void testGloballyWithSchemaAggregateFnNestedFields() {
     pipeline.run();
   }
 
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  abstract static class BasicEnum {
+    enum Test {
+      ZERO,
+      ONE,
+      TWO
+    };
+
+    abstract String getKey();
+
+    abstract Test getEnumeration();
+
+    static BasicEnum of(String key, Test value) {
+      return new AutoValue_GroupTest_BasicEnum(key, value);
+    }
+  }
+
+  static final EnumerationType BASIC_ENUM_ENUMERATION =
+      EnumerationType.create("ZERO", "ONE", "TWO");
+  static final Schema BASIC_ENUM_SCHEMA =
+      Schema.builder()
+          .addStringField("key")
+          .addLogicalTypeField("enumeration", BASIC_ENUM_ENUMERATION)
+          .build();
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testAggregateBaseValuesGlobally() {
+    Collection<BasicEnum> elements =
+        Lists.newArrayList(
+            BasicEnum.of("a", BasicEnum.Test.ONE), BasicEnum.of("a", BasicEnum.Test.TWO));
+
+    PCollection<Row> aggregate =
+        pipeline
+            .apply(Create.of(elements))
+            .apply(
+                Group.<BasicEnum>globally()
+                    .aggregateFieldBaseValue("enumeration", Sum.ofIntegers(), "enum_sum"));
+    Schema aggregateSchema = Schema.builder().addInt32Field("enum_sum").build();
+    Row expectedRow = Row.withSchema(aggregateSchema).addValues(3).build();
+    PAssert.that(aggregate).containsInAnyOrder(expectedRow);
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testAggregateLogicalValuesGlobally() {
+    Collection<BasicEnum> elements =
+        Lists.newArrayList(
+            BasicEnum.of("a", BasicEnum.Test.ONE), BasicEnum.of("a", BasicEnum.Test.TWO));
+
+    SampleAnyCombineFn<EnumerationType.Value> sampleAnyCombineFn = new SampleAnyCombineFn<>(100);
 
 Review comment:
   I see, there are two methods: `aggregateFieldBaseValue` and `aggregateField`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] reuvenlax commented on a change in pull request #11074: Store logical type values in Row instead of base values

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #11074: Store logical type values in Row instead of base values
URL: https://github.com/apache/beam/pull/11074#discussion_r390766432
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
 ##########
 @@ -126,21 +127,25 @@ public T apply(Row row) {
               valueType,
               typeFactory);
     } else {
-      if (type.getTypeName().isLogicalType()
-          && OneOfType.IDENTIFIER.equals(type.getLogicalType().getIdentifier())) {
+      if (type.isLogicalType(OneOfType.IDENTIFIER)) {
 
 Review comment:
   because the actual conversion type changes depending on the oneOf case type. there might be a better way to handle this, but I'm not sure what it is yet.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services