You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/06/14 07:20:58 UTC

[GitHub] [beam] ibzib commented on a change in pull request #14986: [BEAM-12473] fix possible ClassCastException with UDAF

ibzib commented on a change in pull request #14986:
URL: https://github.com/apache/beam/pull/14986#discussion_r650217041



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {
+
+  public static class CombineFnDelegate<InputT, AccumT, OutputT>

Review comment:
       Add a javadoc comment.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
##########
@@ -70,7 +72,16 @@ public String getName() {
 
           @Override
           public RelDataType getType(RelDataTypeFactory typeFactory) {
-            return CalciteUtils.sqlTypeWithAutoCast(typeFactory, getInputType());
+            Type inputType = getInputType();
+            if (inputType instanceof TypeVariable) {
+              throw new IllegalArgumentException(
+                  "Unable to infer SQL type from type variable "
+                      + inputType
+                      + ". This usually means you are trying to use a generic type whose type information "
+                      + "is not known at runtime. You can wrap your CombineFn into typed subclass"
+                      + " by 'new UdfTypeUtils.CombineFnDelegate<>(combineFn) {}'");

Review comment:
       Nit: if the user literally types `new UdfTypeUtils.CombineFnDelegate<>(combineFn) {}`, it will fail to compile: `Cannot use '<>' with anonymous inner classes`. Can we put in placeholders for the implementation types?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
##########
@@ -70,7 +72,16 @@ public String getName() {
 
           @Override
           public RelDataType getType(RelDataTypeFactory typeFactory) {
-            return CalciteUtils.sqlTypeWithAutoCast(typeFactory, getInputType());
+            Type inputType = getInputType();
+            if (inputType instanceof TypeVariable) {

Review comment:
       I wonder if (here and elsewhere), instead of checking `inputType instanceof TypeVariable`, we should check `!(inputType instanceof ParameterizedType)`. Not sure if we are able to handle GenericArrayType or WildcardType. https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/reflect/Type.html

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {

Review comment:
       Why do we need this outer class?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {
+
+  public static class CombineFnDelegate<InputT, AccumT, OutputT>
+      extends Combine.CombineFn<InputT, AccumT, OutputT> {
+
+    private final Combine.CombineFn<InputT, AccumT, OutputT> delegate;
+
+    protected CombineFnDelegate(Combine.CombineFn<InputT, AccumT, OutputT> delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public AccumT createAccumulator() {
+      return delegate.createAccumulator();
+    }
+
+    @Override
+    public AccumT addInput(AccumT mutableAccumulator, InputT input) {
+      return delegate.addInput(mutableAccumulator, input);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+      return delegate.mergeAccumulators(accumulators);
+    }
+
+    @Override
+    public OutputT extractOutput(AccumT accumulator) {
+      return delegate.extractOutput(accumulator);
+    }
+
+    @Override
+    public AccumT compact(AccumT accumulator) {
+      return delegate.compact(accumulator);
+    }
+
+    @Override
+    public OutputT apply(Iterable<? extends InputT> inputs) {
+      return delegate.apply(inputs);
+    }
+
+    @Override
+    public OutputT defaultValue() {
+      return delegate.defaultValue();
+    }
+
+    @Override
+    public TypeDescriptor<OutputT> getOutputType() {
+      return Optional.<TypeDescriptor<OutputT>>ofNullable(getGenericSuperTypeAtIndex(2))
+          .orElse(delegate.getOutputType());
+    }
+
+    @Override
+    public TypeDescriptor<InputT> getInputType() {
+      return Optional.<TypeDescriptor<InputT>>ofNullable(getGenericSuperTypeAtIndex(0))
+          .orElse(delegate.getInputType());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Nullable
+    private <T> TypeDescriptor<T> getGenericSuperTypeAtIndex(int index) {
+      Type superClass = getClass().getGenericSuperclass();
+      if (superClass != null) {
+        ParameterizedType superType = (ParameterizedType) superClass;

Review comment:
       What if superClass isn't an instance of ParameterizedType? IIRC that would be possible if using multiple layers of inheritance. (Not sure why anyone would do that, but probably something we should handle regardless.)

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {
+
+  public static class CombineFnDelegate<InputT, AccumT, OutputT>
+      extends Combine.CombineFn<InputT, AccumT, OutputT> {
+
+    private final Combine.CombineFn<InputT, AccumT, OutputT> delegate;
+
+    protected CombineFnDelegate(Combine.CombineFn<InputT, AccumT, OutputT> delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public AccumT createAccumulator() {
+      return delegate.createAccumulator();
+    }
+
+    @Override
+    public AccumT addInput(AccumT mutableAccumulator, InputT input) {
+      return delegate.addInput(mutableAccumulator, input);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+      return delegate.mergeAccumulators(accumulators);
+    }
+
+    @Override
+    public OutputT extractOutput(AccumT accumulator) {
+      return delegate.extractOutput(accumulator);
+    }
+
+    @Override
+    public AccumT compact(AccumT accumulator) {
+      return delegate.compact(accumulator);
+    }
+
+    @Override
+    public OutputT apply(Iterable<? extends InputT> inputs) {
+      return delegate.apply(inputs);
+    }
+
+    @Override
+    public OutputT defaultValue() {
+      return delegate.defaultValue();
+    }
+
+    @Override
+    public TypeDescriptor<OutputT> getOutputType() {

Review comment:
       Nit: we could move getOutputType and getInputType to the top of the class to highlight they're the only methods that don't just defer to the delegate.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
##########
@@ -102,17 +117,23 @@ protected Type getOutputType() {
     return combineFn.getOutputType().getType();
   }
 
+  @Nullable
+  private Type getDeclaredInputType() {

Review comment:
       Why do we need a separate method?

##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/LazyAggregateCombineFnTest.java
##########
@@ -63,6 +69,20 @@ public void mergeAccumulators() {
     assertEquals(2L, merged);
   }
 
+  @Test
+  public void testParameterExtractionFromCombineFn_CombineFnDelegate() {

Review comment:
       This test is in class `LazyAggregateCombineFnTest`, but it doesn't use `LazyAggregateCombineFn` at all. Since  `UdafImplTest.java` doesn't exist, let's create a new file for this test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org