You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/02/02 17:23:14 UTC

[GitHub] [flink] dawidwys commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference

dawidwys commented on a change in pull request #10960: [FLINK-15487][table] Update code generation for new type inference
URL: https://github.com/apache/flink/pull/10960#discussion_r373860780
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
 ##########
 @@ -421,4 +431,173 @@ private void testUserDefinedCatalogFunction(String createFunctionDDL) throws Exc
 		tEnv().sqlUpdate("drop table t1");
 		tEnv().sqlUpdate("drop table t2");
 	}
+
+	@Test
+	public void testPrimitiveScalarFunction() throws Exception {
+		final List<Row> sourceData = Arrays.asList(
+			Row.of(1, 1L, "-"),
+			Row.of(2, 2L, "--"),
+			Row.of(3, 3L, "---")
+		);
+
+		final List<Row> sinkData = Arrays.asList(
+			Row.of(1, 3L, "-"),
+			Row.of(2, 6L, "--"),
+			Row.of(3, 9L, "---")
+		);
+
+		TestCollectionTableFactory.reset();
+		TestCollectionTableFactory.initData(sourceData);
+
+		tEnv().sqlUpdate("CREATE TABLE TestTable(a INT NOT NULL, b BIGINT NOT NULL, c STRING) WITH ('connector' = 'COLLECTION')");
+
+		tEnv().createTemporarySystemFunction("PrimitiveScalarFunction", PrimitiveScalarFunction.class);
+		tEnv().sqlUpdate("INSERT INTO TestTable SELECT a, PrimitiveScalarFunction(a, b, c), c FROM TestTable");
+		tEnv().execute("Test Job");
+
+		assertThat(TestCollectionTableFactory.getResult(), equalTo(sinkData));
+	}
+
+	@Test
+	public void testComplexScalarFunction() throws Exception {
+		final List<Row> sourceData = Arrays.asList(
+			Row.of(1, new byte[]{1, 2, 3}),
+			Row.of(2, new byte[]{2, 3, 4}),
+			Row.of(3, new byte[]{3, 4, 5})
+		);
+
+		final List<Row> sinkData = Arrays.asList(
+			Row.of(1, "1+2012-12-12 12:12:12.123456789", "[1, 2, 3]+2012-12-12 12:12:12.123456789", new BigDecimal("123.40"), "[1, 2, 3]"),
+			Row.of(2, "2+2012-12-12 12:12:12.123456789", "[2, 3, 4]+2012-12-12 12:12:12.123456789", new BigDecimal("123.40"), "[2, 3, 4]"),
+			Row.of(3, "3+2012-12-12 12:12:12.123456789", "[3, 4, 5]+2012-12-12 12:12:12.123456789", new BigDecimal("123.40"), "[3, 4, 5]")
+		);
+
+		TestCollectionTableFactory.reset();
+		TestCollectionTableFactory.initData(sourceData);
+
+		tEnv().sqlUpdate(
+			"CREATE TABLE SourceTable(i INT, b BYTES) " +
+			"WITH ('connector' = 'COLLECTION')");
+		tEnv().sqlUpdate(
+			"CREATE TABLE SinkTable(i INT, s1 STRING, s2 STRING, d DECIMAL(5, 2), s3 STRING) " +
+			"WITH ('connector' = 'COLLECTION')");
+
+		tEnv().createTemporarySystemFunction("ComplexScalarFunction", ComplexScalarFunction.class);
+		tEnv().sqlUpdate(
+			"INSERT INTO SinkTable " +
+			"SELECT " +
+			"  i, " +
+			"  ComplexScalarFunction(i, TIMESTAMP '2012-12-12 12:12:12.123456789'), " +
+			"  ComplexScalarFunction(b, TIMESTAMP '2012-12-12 12:12:12.123456789')," +
+			"  ComplexScalarFunction(), " +
+			"  ComplexScalarFunction(b) " +
+			"FROM SourceTable");
+		tEnv().execute("Test Job");
+
+		assertThat(TestCollectionTableFactory.getResult(), equalTo(sinkData));
+	}
+
+	@Test
+	public void testCustomScalarFunction() throws Exception {
+		final List<Row> sourceData = Arrays.asList(
+			Row.of(1),
+			Row.of(2),
+			Row.of(3)
+		);
+
+		final List<Row> sinkData = Arrays.asList(
+			Row.of(1, 1, 5),
+			Row.of(2, 2, 5),
+			Row.of(3, 3, 5)
+		);
+
+		TestCollectionTableFactory.reset();
+		TestCollectionTableFactory.initData(sourceData);
+
+		tEnv().sqlUpdate("CREATE TABLE SourceTable(i INT) WITH ('connector' = 'COLLECTION')");
+		tEnv().sqlUpdate("CREATE TABLE SinkTable(i1 INT, i2 INT, i3 INT) WITH ('connector' = 'COLLECTION')");
+
+		tEnv().createTemporarySystemFunction("CustomScalarFunction", CustomScalarFunction.class);
+		tEnv().sqlUpdate(
+			"INSERT INTO SinkTable " +
+			"SELECT " +
+			"  i, " +
+			"  CustomScalarFunction(i), " +
+			"  CustomScalarFunction(CAST(NULL AS INT), 5, i, i) " +
+			"FROM SourceTable");
+		tEnv().execute("Test Job");
+
+		assertThat(TestCollectionTableFactory.getResult(), equalTo(sinkData));
+	}
+
+	@Test
+	public void testInvalidCustomScalarFunction() {
+		tEnv().sqlUpdate("CREATE TABLE SinkTable(s STRING) WITH ('connector' = 'COLLECTION')");
+
+		tEnv().createTemporarySystemFunction("CustomScalarFunction", CustomScalarFunction.class);
+		try {
+			tEnv().sqlUpdate(
+				"INSERT INTO SinkTable " +
+				"SELECT CustomScalarFunction('test')");
+			fail();
+		} catch (CodeGenException e) {
+			assertThat(
+				e,
+				hasMessage(
+					equalTo(
+						"Could not find an implementation method that matches the following " +
+							"signature: java.lang.String eval(java.lang.String)")));
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Test functions
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Function that takes and returns primitives.
+	 */
+	public static class PrimitiveScalarFunction extends ScalarFunction {
+		public long eval(int i, long l, String s) {
+			return i + l + s.length();
+		}
+	}
+
+	/**
+	 * Function that is overloaded and takes use of annotations.
+	 */
+	public static class ComplexScalarFunction extends ScalarFunction {
+		public String eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o, java.sql.Timestamp t) {
+			return EncodingUtils.objectToString(o) + "+" + t.toString();
+		}
+
+		public @DataTypeHint("DECIMAL(5, 2)") BigDecimal eval() {
+			return new BigDecimal("123.4"); // 1 digit is missing
+		}
+
+		public String eval(byte[] bytes) {
+			return Arrays.toString(bytes);
+		}
+	}
+
+	/**
+	 * Function that has a custom type inference that is broader than the actual implementation.
+	 */
+	public static class CustomScalarFunction extends ScalarFunction {
+		public Integer eval(Integer... args) {
+			for (Integer o : args) {
+				if (o != null) {
+					return o;
+				}
+			}
+			return null;
+		}
+
+		@Override
+		public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+			return TypeInference.newBuilder()
+				.outputTypeStrategy(TypeStrategies.argument(0))
 
 Review comment:
   Shall we maybe change the `inputTypeStrategy` to be also required instead of a `WILDCARD`? I think we should not assume anything for users that end up defining their own type inference strategies. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services