You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/07 10:10:11 UTC

[GitHub] [flink] wuchong commented on a diff in pull request #19715: [FLINK-27606][table] Fix code generator exception when using imperative udaf that is introduced for planner scala-free

wuchong commented on code in PR #19715:
URL: https://github.com/apache/flink/pull/19715#discussion_r891031052


##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SQLCodegenE2EITCase.java:
##########
@@ -0,0 +1,198 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.cache.DownloadCache;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.tests.util.flink.SQLJobSubmission;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+/** End to End tests for sql codegen. */
+@RunWith(Parameterized.class)
+public class SQLCodegenE2EITCase extends TestLogger {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SQLCodegenE2EITCase.class);
+
+    private static final String CODEGEN_E2E_SQL = "codegen_e2e.sql";
+
+    @Parameterized.Parameters(name = "executionMode")
+    public static Collection<String> data() {
+        return Arrays.asList("streaming", "batch");
+    }
+
+    private static Configuration getConfiguration() {
+        // we have to enable checkpoint to trigger flushing for filesystem sink
+        final Configuration flinkConfig = new Configuration();
+        flinkConfig.setString("execution.checkpointing.interval", "5s");
+        return flinkConfig;
+    }
+
+    @Rule
+    public final FlinkResource flink =
+            new LocalStandaloneFlinkResourceFactory()
+                    .create(
+                            FlinkResourceSetup.builder()
+                                    .addConfiguration(getConfiguration())
+                                    .build());
+
+    @Rule public final TemporaryFolder tmp = new TemporaryFolder();
+
+    private final String executionMode;
+    private Path result;
+
+    @ClassRule public static final DownloadCache DOWNLOAD_CACHE = DownloadCache.get();
+
+    private static final Path sqlToolBoxJar = TestUtils.getResource(".*SqlToolbox.jar");
+
+    public SQLCodegenE2EITCase(String executionMode) {
+        this.executionMode = executionMode;
+    }
+
+    @Before
+    public void before() throws Exception {
+        DOWNLOAD_CACHE.before();
+        Path tmpPath = tmp.getRoot().toPath();
+        LOG.info("The current temporary path: {}", tmpPath);
+        this.result = tmpPath.resolve("result");
+    }
+
+    @Test
+    public void testImperativeUdaf() throws Exception {
+        try (ClusterController clusterController = flink.startCluster(1)) {
+            // Initialize the SQL statements from "codegen_e2e.sql" file
+            Map<String, String> varsMap = new HashMap<>();
+            varsMap.put("$RESULT", this.result.toAbsolutePath().toString());
+            varsMap.put("$MODE", this.executionMode);
+
+            List<String> sqlLines = initializeSqlLines(varsMap);
+
+            // Execute SQL statements in "codegen_e2e.sql" file
+            executeSqlStatements(clusterController, sqlLines);
+
+            // Wait until all the results flushed to the json file.
+            LOG.info("Verify the json result.");
+            checkJsonResultFile();
+            LOG.info("The codegen SQL client test run successfully.");
+        }
+    }
+
+    private void executeSqlStatements(ClusterController clusterController, List<String> sqlLines)
+            throws IOException {
+        LOG.info("Executing end-to-end SQL statements {}.", sqlLines);
+        clusterController.submitSQLJob(
+                new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
+                        .addJar(sqlToolBoxJar)
+                        .build(),
+                Duration.ofMinutes(2L));
+    }
+
+    private List<String> initializeSqlLines(Map<String, String> vars) throws IOException {
+        URL url = SQLCodegenE2EITCase.class.getClassLoader().getResource(CODEGEN_E2E_SQL);
+        if (url == null) {
+            throw new FileNotFoundException(CODEGEN_E2E_SQL);
+        }
+
+        List<String> lines = Files.readAllLines(new File(url.getFile()).toPath());
+        List<String> result = new ArrayList<>();
+        for (String line : lines) {
+            for (Map.Entry<String, String> var : vars.entrySet()) {
+                line = line.replace(var.getKey(), var.getValue());
+            }
+            result.add(line);
+        }
+
+        return result;
+    }
+
+    private void checkJsonResultFile() throws Exception {
+        boolean success = false;
+        final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(20));
+        while (deadline.hasTimeLeft()) {
+            if (Files.exists(result)) {
+                List<String> lines = readJsonResultFiles(result);
+                if (lines.size() == 2) {
+                    success = true;
+                    assertThat(
+                            lines.toArray(new String[0]),
+                            arrayContainingInAnyOrder(
+                                    "{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
+                                    "{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+                    break;
+                } else {
+                    LOG.info(
+                            "The target Json {} does not contain enough records, current {} records, left time: {}s",
+                            result,
+                            lines.size(),
+                            deadline.timeLeft().getSeconds());
+                }
+            } else {
+                LOG.info("The target Json {} does not exist now", result);
+            }
+            Thread.sleep(500);
+        }
+        Assert.assertTrue("Did not get expected results before timeout.", success);

Review Comment:
   Please print the content of the result file if there is any. It would help to debug if the test is not passed on Azure. 



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/log4j2-test.properties:
##########
@@ -0,0 +1,34 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = INFO

Review Comment:
   Please turn off the logger by default to avoid too many logs. 



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SQLCodegenE2EITCase.java:
##########
@@ -0,0 +1,198 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.cache.DownloadCache;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.tests.util.flink.SQLJobSubmission;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+/** End to End tests for sql codegen. */
+@RunWith(Parameterized.class)
+public class SQLCodegenE2EITCase extends TestLogger {

Review Comment:
   Could you add more comments on this class to describe why we need this test? 
   It confuses that why a code generation test is not placed in the planner module. 
   Maybe we can rename this test to `PlannerScalaFreeITCase` to make the purpose more explicit. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org