You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/03/16 17:47:02 UTC

[GitHub] [beam] dpcollins-google opened a new pull request #14254: Fix nullability issues with BeamZetaSqlCalcRel

dpcollins-google opened a new pull request #14254:
URL: https://github.com/apache/beam/pull/14254


   Remove a misimplemented optimization that leads to failed null checks in BeamZetaSqlCalcRel
   
   Also fix nullability of this class in general.
   
   Caused by: java.lang.NullPointerException: Null future
           org.apache.beam.sdk.extensions.sql.zetasql.AutoValue_BeamZetaSqlCalcRel_TimestampedFuture.<init>(AutoValue_BeamZetaSqlCalcRel_TimestampedFuture.java:23)
           org.apache.beam.sdk.extensions.sql.zetasql.BeamZetaSqlCalcRel$TimestampedFuture.create(BeamZetaSqlCalcRel.java:104)
           org.apache.beam.sdk.extensions.sql.zetasql.BeamZetaSqlCalcRel$TimestampedFuture.access$500(BeamZetaSqlCalcRel.java:102)
           org.apache.beam.sdk.extensions.sql.zetasql.BeamZetaSqlCalcRel$CalcFn.processElement(BeamZetaSqlCalcRel.java:254)
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam
 .apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.a
 pache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #14254: [BEAM-10402] Fix nullability issues with BeamZetaSqlCalcRel

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #14254:
URL: https://github.com/apache/beam/pull/14254#discussion_r599656112



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
##########
@@ -299,7 +295,7 @@ private void outputRow(TimestampedFuture c, OutputReceiver<Row> r) throws Interr
       try {
         v = c.future().get();
       } catch (ExecutionException e) {
-        throw (RuntimeException) e.getCause();
+        throw new RuntimeException(checkArgumentNotNull(e.getCause()));

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] apilloud commented on a change in pull request #14254: [BEAM-10402] Fix nullability issues with BeamZetaSqlCalcRel

Posted by GitBox <gi...@apache.org>.
apilloud commented on a change in pull request #14254:
URL: https://github.com/apache/beam/pull/14254#discussion_r597336721



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
##########
@@ -299,7 +295,7 @@ private void outputRow(TimestampedFuture c, OutputReceiver<Row> r) throws Interr
       try {
         v = c.future().get();
       } catch (ExecutionException e) {
-        throw (RuntimeException) e.getCause();
+        throw new RuntimeException(checkArgumentNotNull(e.getCause()));

Review comment:
       Sorry, I missed this on the first pass. We want `RuntimeException` to be passed through without modification if possible. I believe this will work:
   ```
   if (e.getCause() instanceof RuntimeException) {
     throw (RuntimeException) e.getCause();
   }
   throw new RuntimeException(e);
   ```

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
##########
@@ -169,12 +169,10 @@ private static TimestampedFuture create(Instant t, Future<Value> f) {
     private final Schema outputSchema;
     private final String defaultTimezone;
     private final boolean verifyRowValues;
+    private transient List<Integer> referencedColumns = ImmutableList.of();

Review comment:
       Findbugs is unhappy with these two lines. Tag them with `@SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED")`
   
   Also need: `import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #14254: [BEAM-10402] Fix nullability issues with BeamZetaSqlCalcRel

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #14254:
URL: https://github.com/apache/beam/pull/14254#discussion_r596879682



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
##########
@@ -229,24 +226,19 @@ public Duration getAllowedTimestampSkew() {
     public void processElement(
         @Element Row row, @Timestamp Instant t, BoundedWindow w, OutputReceiver<Row> r)
         throws InterruptedException {
-      final Future<Value> valueFuture;
-
-      if (row.equals(previousRow)) {
-        valueFuture = previousFuture;
-      } else {
-        Map<String, Value> columns = new HashMap<>();
-        for (int i : referencedColumns) {
-          columns.put(
-              columnName(i),
-              ZetaSqlBeamTranslationUtils.toZetaSqlValue(
-                  row.getBaseValue(i, Object.class), inputSchema.getField(i).getType()));
-        }
-
-        valueFuture = stream.execute(columns, nullParams);
+      Map<String, Value> columns = new HashMap<>();
+      for (int i : checkArgumentNotNull(referencedColumns)) {

Review comment:
       Done.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
##########
@@ -201,23 +199,22 @@ public void setup() {
             ZetaSqlBeamTranslationUtils.toZetaSqlType(inputSchema.getField(i).getType()));
       }
 
-      exp = new PreparedExpression(sql);
-      exp.prepare(options);
+      PreparedExpression expression = new PreparedExpression(sql);
+      exp = expression;

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] apilloud merged pull request #14254: [BEAM-10402] Fix nullability issues with BeamZetaSqlCalcRel

Posted by GitBox <gi...@apache.org>.
apilloud merged pull request #14254:
URL: https://github.com/apache/beam/pull/14254


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #14254: [BEAM-10402] Fix nullability issues with BeamZetaSqlCalcRel

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #14254:
URL: https://github.com/apache/beam/pull/14254#discussion_r599656280



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
##########
@@ -169,12 +169,10 @@ private static TimestampedFuture create(Instant t, Future<Value> f) {
     private final Schema outputSchema;
     private final String defaultTimezone;
     private final boolean verifyRowValues;
+    private transient List<Integer> referencedColumns = ImmutableList.of();

Review comment:
       fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on pull request #14254: Fix nullability issues with BeamZetaSqlCalcRel

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on pull request #14254:
URL: https://github.com/apache/beam/pull/14254#issuecomment-800475835


   @apilloud 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #14254: [BEAM-10402] Fix nullability issues with BeamZetaSqlCalcRel

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #14254:
URL: https://github.com/apache/beam/pull/14254#discussion_r599816059



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
##########
@@ -59,6 +62,7 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.NonNull;

Review comment:
       This is the default so should not be needed, no?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google edited a comment on pull request #14254: Fix nullability issues with BeamZetaSqlCalcRel

Posted by GitBox <gi...@apache.org>.
dpcollins-google edited a comment on pull request #14254:
URL: https://github.com/apache/beam/pull/14254#issuecomment-800475835


   R: @apilloud 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] apilloud commented on pull request #14254: [BEAM-10402] Fix nullability issues with BeamZetaSqlCalcRel

Posted by GitBox <gi...@apache.org>.
apilloud commented on pull request #14254:
URL: https://github.com/apache/beam/pull/14254#issuecomment-800664737


   `./gradlew :sdks:java:extensions:sql:zetasql:spotlessApply` will fix the spotless failures.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] apilloud commented on a change in pull request #14254: [BEAM-10402] Fix nullability issues with BeamZetaSqlCalcRel

Posted by GitBox <gi...@apache.org>.
apilloud commented on a change in pull request #14254:
URL: https://github.com/apache/beam/pull/14254#discussion_r595602239



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
##########
@@ -229,24 +226,19 @@ public Duration getAllowedTimestampSkew() {
     public void processElement(
         @Element Row row, @Timestamp Instant t, BoundedWindow w, OutputReceiver<Row> r)
         throws InterruptedException {
-      final Future<Value> valueFuture;
-
-      if (row.equals(previousRow)) {
-        valueFuture = previousFuture;
-      } else {
-        Map<String, Value> columns = new HashMap<>();
-        for (int i : referencedColumns) {
-          columns.put(
-              columnName(i),
-              ZetaSqlBeamTranslationUtils.toZetaSqlValue(
-                  row.getBaseValue(i, Object.class), inputSchema.getField(i).getType()));
-        }
-
-        valueFuture = stream.execute(columns, nullParams);
+      Map<String, Value> columns = new HashMap<>();
+      for (int i : checkArgumentNotNull(referencedColumns)) {

Review comment:
       If you set `referencedColumns` to a default value in the constructor (say `self.referencedColumns = ImmutableList.of()`) you could drop the `@Nullable` and remove the `checkArgumentNotNull`.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
##########
@@ -229,24 +226,19 @@ public Duration getAllowedTimestampSkew() {
     public void processElement(
         @Element Row row, @Timestamp Instant t, BoundedWindow w, OutputReceiver<Row> r)
         throws InterruptedException {
-      final Future<Value> valueFuture;
-
-      if (row.equals(previousRow)) {
-        valueFuture = previousFuture;
-      } else {
-        Map<String, Value> columns = new HashMap<>();
-        for (int i : referencedColumns) {
-          columns.put(
-              columnName(i),
-              ZetaSqlBeamTranslationUtils.toZetaSqlValue(
-                  row.getBaseValue(i, Object.class), inputSchema.getField(i).getType()));
-        }
-
-        valueFuture = stream.execute(columns, nullParams);
+      Map<String, Value> columns = new HashMap<>();
+      for (int i : checkArgumentNotNull(referencedColumns)) {
+        columns.put(
+            columnName(i),
+            ZetaSqlBeamTranslationUtils.toZetaSqlValue(
+                row.getBaseValue(i, Object.class),
+                inputSchema.getField(i).getType()));
       }
-      previousRow = row;
 
-      @Nullable Queue<TimestampedFuture> pendingWindow = pending.get(w);
+      @NonNull
+      Future<Value> valueFuture = checkArgumentNotNull(stream).execute(columns, nullParams);
+
+      @Nullable Queue<TimestampedFuture> pendingWindow = checkArgumentNotNull(pending).get(w);

Review comment:
       Add `this.pending = new HashMap<>();` to the constructor, then you can drop the nullness check here too.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
##########
@@ -201,23 +199,22 @@ public void setup() {
             ZetaSqlBeamTranslationUtils.toZetaSqlType(inputSchema.getField(i).getType()));
       }
 
-      exp = new PreparedExpression(sql);
-      exp.prepare(options);
+      PreparedExpression expression = new PreparedExpression(sql);
+      exp = expression;

Review comment:
       I'm not really a fan of this. I believe you can move `self.exp = new PreparedExpression(sql);` to the constructor so `exp` doesn't need to be `@Nullable`? (You'll also need to add the line to `teardown` as a `PreparedExpression` can't be reused.) Then you should be able to revert the rest of the changes to this method. If that doesn't work, I'd prefer nullness checks in this method for consistency.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #14254: [BEAM-10402] Fix nullability issues with BeamZetaSqlCalcRel

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #14254:
URL: https://github.com/apache/beam/pull/14254#discussion_r596880207



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
##########
@@ -229,24 +226,19 @@ public Duration getAllowedTimestampSkew() {
     public void processElement(
         @Element Row row, @Timestamp Instant t, BoundedWindow w, OutputReceiver<Row> r)
         throws InterruptedException {
-      final Future<Value> valueFuture;
-
-      if (row.equals(previousRow)) {
-        valueFuture = previousFuture;
-      } else {
-        Map<String, Value> columns = new HashMap<>();
-        for (int i : referencedColumns) {
-          columns.put(
-              columnName(i),
-              ZetaSqlBeamTranslationUtils.toZetaSqlValue(
-                  row.getBaseValue(i, Object.class), inputSchema.getField(i).getType()));
-        }
-
-        valueFuture = stream.execute(columns, nullParams);
+      Map<String, Value> columns = new HashMap<>();
+      for (int i : checkArgumentNotNull(referencedColumns)) {
+        columns.put(
+            columnName(i),
+            ZetaSqlBeamTranslationUtils.toZetaSqlValue(
+                row.getBaseValue(i, Object.class),
+                inputSchema.getField(i).getType()));
       }
-      previousRow = row;
 
-      @Nullable Queue<TimestampedFuture> pendingWindow = pending.get(w);
+      @NonNull
+      Future<Value> valueFuture = checkArgumentNotNull(stream).execute(columns, nullParams);
+
+      @Nullable Queue<TimestampedFuture> pendingWindow = checkArgumentNotNull(pending).get(w);

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org