You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/06/14 07:52:19 UTC

[GitHub] [beam] je-ik commented on a change in pull request #14986: [BEAM-12473] fix possible ClassCastException with UDAF

je-ik commented on a change in pull request #14986:
URL: https://github.com/apache/beam/pull/14986#discussion_r650365075



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
##########
@@ -70,7 +72,16 @@ public String getName() {
 
           @Override
           public RelDataType getType(RelDataTypeFactory typeFactory) {
-            return CalciteUtils.sqlTypeWithAutoCast(typeFactory, getInputType());
+            Type inputType = getInputType();
+            if (inputType instanceof TypeVariable) {
+              throw new IllegalArgumentException(
+                  "Unable to infer SQL type from type variable "
+                      + inputType
+                      + ". This usually means you are trying to use a generic type whose type information "
+                      + "is not known at runtime. You can wrap your CombineFn into typed subclass"
+                      + " by 'new UdfTypeUtils.CombineFnDelegate<>(combineFn) {}'");

Review comment:
       The code is valid in JDK11+. Maybe JDK8 users will be able to figure out how to fix that? Or we can add `<...>`,

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
##########
@@ -70,7 +72,16 @@ public String getName() {
 
           @Override
           public RelDataType getType(RelDataTypeFactory typeFactory) {
-            return CalciteUtils.sqlTypeWithAutoCast(typeFactory, getInputType());
+            Type inputType = getInputType();
+            if (inputType instanceof TypeVariable) {

Review comment:
       Nice catch! We should handle at least GenericTypeArray, as that can be when the class would extend `CombineFn<List<String>[], ...>`. WildcardType should not be possible there, because one cannot do `extends X<?>`.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
##########
@@ -102,17 +117,23 @@ protected Type getOutputType() {
     return combineFn.getOutputType().getType();
   }
 
+  @Nullable
+  private Type getDeclaredInputType() {

Review comment:
       Hm, I didn't want to interfere to the previous logic too much. But maybe we can unify that. The `getOutputType` calls the `combineFn.getOutputType` anyway.

##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/LazyAggregateCombineFnTest.java
##########
@@ -63,6 +69,20 @@ public void mergeAccumulators() {
     assertEquals(2L, merged);
   }
 
+  @Test
+  public void testParameterExtractionFromCombineFn_CombineFnDelegate() {

Review comment:
       Alright, we will probably need more tests anyway, due to handling of the GenericTypeArray.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {
+
+  public static class CombineFnDelegate<InputT, AccumT, OutputT>
+      extends Combine.CombineFn<InputT, AccumT, OutputT> {
+
+    private final Combine.CombineFn<InputT, AccumT, OutputT> delegate;
+
+    protected CombineFnDelegate(Combine.CombineFn<InputT, AccumT, OutputT> delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public AccumT createAccumulator() {
+      return delegate.createAccumulator();
+    }
+
+    @Override
+    public AccumT addInput(AccumT mutableAccumulator, InputT input) {
+      return delegate.addInput(mutableAccumulator, input);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+      return delegate.mergeAccumulators(accumulators);
+    }
+
+    @Override
+    public OutputT extractOutput(AccumT accumulator) {
+      return delegate.extractOutput(accumulator);
+    }
+
+    @Override
+    public AccumT compact(AccumT accumulator) {
+      return delegate.compact(accumulator);
+    }
+
+    @Override
+    public OutputT apply(Iterable<? extends InputT> inputs) {
+      return delegate.apply(inputs);
+    }
+
+    @Override
+    public OutputT defaultValue() {
+      return delegate.defaultValue();
+    }
+
+    @Override
+    public TypeDescriptor<OutputT> getOutputType() {
+      return Optional.<TypeDescriptor<OutputT>>ofNullable(getGenericSuperTypeAtIndex(2))
+          .orElse(delegate.getOutputType());
+    }
+
+    @Override
+    public TypeDescriptor<InputT> getInputType() {
+      return Optional.<TypeDescriptor<InputT>>ofNullable(getGenericSuperTypeAtIndex(0))
+          .orElse(delegate.getInputType());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Nullable
+    private <T> TypeDescriptor<T> getGenericSuperTypeAtIndex(int index) {
+      Type superClass = getClass().getGenericSuperclass();
+      if (superClass != null) {
+        ParameterizedType superType = (ParameterizedType) superClass;

Review comment:
       agree

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {
+
+  public static class CombineFnDelegate<InputT, AccumT, OutputT>

Review comment:
       :+1:

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {

Review comment:
       We do not need it. I just didn't come up with a clear naming for the inner class. It is related solely to the "type erasure problem", so what would be the best naming? `CombineFnDelegate` as a top-level class in package `org.apache.beam.sdk.extensions.sql` feels weird to me. Maybe we could create a subpackge in `org.apache.beam.sdk.extensions.sql.udf` (maybe `util`)?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {
+
+  public static class CombineFnDelegate<InputT, AccumT, OutputT>
+      extends Combine.CombineFn<InputT, AccumT, OutputT> {
+
+    private final Combine.CombineFn<InputT, AccumT, OutputT> delegate;
+
+    protected CombineFnDelegate(Combine.CombineFn<InputT, AccumT, OutputT> delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public AccumT createAccumulator() {
+      return delegate.createAccumulator();
+    }
+
+    @Override
+    public AccumT addInput(AccumT mutableAccumulator, InputT input) {
+      return delegate.addInput(mutableAccumulator, input);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+      return delegate.mergeAccumulators(accumulators);
+    }
+
+    @Override
+    public OutputT extractOutput(AccumT accumulator) {
+      return delegate.extractOutput(accumulator);
+    }
+
+    @Override
+    public AccumT compact(AccumT accumulator) {
+      return delegate.compact(accumulator);
+    }
+
+    @Override
+    public OutputT apply(Iterable<? extends InputT> inputs) {
+      return delegate.apply(inputs);
+    }
+
+    @Override
+    public OutputT defaultValue() {
+      return delegate.defaultValue();
+    }
+
+    @Override
+    public TypeDescriptor<OutputT> getOutputType() {

Review comment:
       reformatted




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org