You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/12/10 16:15:58 UTC

[GitHub] [iceberg] hililiwei opened a new pull request, #6402: Flink: Add UT for NaN

hililiwei opened a new pull request, #6402:
URL: https://github.com/apache/iceberg/pull/6402

   cc @stevenzwu, I am not sure whether this PR is feasible, please help to review it.
   Thx.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6402: Flink: Add UT for NaN

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6402:
URL: https://github.com/apache/iceberg/pull/6402#discussion_r1050277049


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkFilters.java:
##########
@@ -246,18 +248,70 @@ private static Optional<Expression> convertFieldAndLiteral(
     org.apache.flink.table.expressions.Expression left = args.get(0);
     org.apache.flink.table.expressions.Expression right = args.get(1);
 
-    if (left instanceof FieldReferenceExpression && right instanceof ValueLiteralExpression) {
-      String name = ((FieldReferenceExpression) left).getName();
-      Optional<Object> lit = convertLiteral((ValueLiteralExpression) right);
+    Optional<Object> lit;
+    if (left instanceof FieldReferenceExpression) {
+      lit = convertExpression(right);
       if (lit.isPresent()) {
-        return Optional.of(convertLR.apply(name, lit.get()));
+        return Optional.of(convertLR.apply(((FieldReferenceExpression) left).getName(), lit.get()));
       }
-    } else if (left instanceof ValueLiteralExpression
-        && right instanceof FieldReferenceExpression) {
-      Optional<Object> lit = convertLiteral((ValueLiteralExpression) left);
-      String name = ((FieldReferenceExpression) right).getName();
+    } else if (right instanceof FieldReferenceExpression) {
+      lit = convertExpression(left);
       if (lit.isPresent()) {
-        return Optional.of(convertRL.apply(name, lit.get()));
+        return Optional.of(
+            convertRL.apply(((FieldReferenceExpression) right).getName(), lit.get()));
+      }
+    }
+
+    return Optional.empty();
+  }
+
+  private static Optional<Object> convertExpression(
+      org.apache.flink.table.expressions.Expression expression) {
+    if (expression instanceof ValueLiteralExpression) {
+      return convertLiteral((ValueLiteralExpression) expression);
+    } else if (expression instanceof CallExpression) {
+      return convertCallExpression((CallExpression) expression);
+    }
+    return Optional.empty();
+  }
+
+  private static Optional<Object> convertCallExpression(CallExpression call) {
+    if (!BuiltInFunctionDefinitions.CAST.equals(call.getFunctionDefinition())) {
+      return Optional.empty();
+    }
+
+    List<ResolvedExpression> args = call.getResolvedChildren();
+    if (args.size() != 2) {
+      return Optional.empty();
+    }
+
+    org.apache.flink.table.expressions.Expression left = args.get(0);
+    org.apache.flink.table.expressions.Expression right = args.get(1);
+
+    if (left instanceof ValueLiteralExpression && right instanceof TypeLiteralExpression) {
+      ValueLiteralExpression value = (ValueLiteralExpression) left;
+      TypeLiteralExpression type = (TypeLiteralExpression) right;
+
+      LogicalType logicalType = type.getOutputDataType().getLogicalType();
+
+      Optional<?> result = value.getValueAs(logicalType.getDefaultConversion());
+      if (result.isPresent()) {
+        return Optional.of(result.get());
+      }
+
+      switch (logicalType.getTypeRoot()) {
+        case DOUBLE:
+          Optional<String> strValue = value.getValueAs(String.class);
+          if (strValue.isPresent()) {
+            return Optional.of(Double.valueOf(strValue.get()));

Review Comment:
   I think that this needs to handle `NumberFormatException` in case someone uses an expression like `cast("fail" as double)`. You could put a try/except around the switch and return `Optional.empty`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6402: Flink: Add UT for NaN

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6402:
URL: https://github.com/apache/iceberg/pull/6402#discussion_r1050278409


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java:
##########
@@ -603,7 +603,108 @@ public void testFilterPushDown2Literal() {
   }
 
   @Test
-  public void testSqlParseNaN() {
-    // todo add some test case to test NaN
+  public void testFilterNaN() {
+    final String tableName = "test_table_nan";
+    try {
+      sql(
+          "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE, f FLOAT) WITH ('write.format.default'='%s')",
+          tableName, format.name());
+      sql(
+          "INSERT INTO %s VALUES (1,'iceberg',10, 1.1),(2,'b',20,2.2),(3,CAST(NULL AS VARCHAR),30,3.3),(4,'d',CAST('NaN' AS DOUBLE),4.4)",
+          tableName);
+
+      String sqlNaNDoubleEqual =
+          String.format("SELECT * FROM %s  WHERE d = CAST('NaN' AS DOUBLE)  ", tableName);
+      List<Row> resultDoubleEqual = sql(sqlNaNDoubleEqual);
+      Assert.assertEquals("Should have 0 records", 0, resultDoubleEqual.size());

Review Comment:
   This should have 1 record.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #6402: Flink: Add UT for NaN

Posted by GitBox <gi...@apache.org>.
rdblue commented on PR #6402:
URL: https://github.com/apache/iceberg/pull/6402#issuecomment-1354038557

   @hililiwei, I flagged the test cases in my review, but I now see that @stevenzwu did as well.
   
   The problem is that NaN comparison should always result in `false`. That's why Iceberg doesn't allow `NaN` as a literal value in expressions. If you try to create an expression with a [`NaN` literal, it will fail](https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/expressions/Literals.java#L61).
   
   That forces callers to be specific. If the caller intends to compare with `NaN`, then the comparison should be `false` and there's no need to pass it to Iceberg. If the caller intends to check whether a value is `NaN`, then there is the [`isNaN` unary predicate](https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/expressions/Expressions.java#LL125C28-L125C28).
   
   It is up to Flink how to map expressions. In Spark, `NaN` is a valid comparison, so Spark filter translation converts `x = NaN` to `isNaN(x)`. Flink could do something similar.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6402: Flink: Add UT for NaN

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6402:
URL: https://github.com/apache/iceberg/pull/6402#discussion_r1050278958


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java:
##########
@@ -603,7 +605,103 @@ public void testFilterPushDown2Literal() {
   }
 
   @Test
-  public void testSqlParseNaN() {
-    // todo add some test case to test NaN
+  public void testFilterNaN() {
+    sql(
+        "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE, f FLOAT) WITH ('write.format.default'='%s')",
+        TABLE_NAME_NAN, format.name());
+    sql(
+        "INSERT INTO %s VALUES (1,'iceberg',10, 1.1),(2,'b',20,2.2),(3,CAST(NULL AS VARCHAR),30,3.3),(4,'d',CAST('NaN' AS DOUBLE),4.4)",
+        TABLE_NAME_NAN);
+
+    String sqlNaNDoubleEqual =
+        String.format("SELECT * FROM %s  WHERE d = CAST('NaN' AS DOUBLE)  ", TABLE_NAME_NAN);
+    List<Row> resultDoubleEqual = sql(sqlNaNDoubleEqual);
+    Assert.assertEquals("Should have 0 records", 0, resultDoubleEqual.size());

Review Comment:
   See my note below on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6402: Flink: Add UT for NaN

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6402:
URL: https://github.com/apache/iceberg/pull/6402#discussion_r1050277901


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java:
##########
@@ -603,7 +603,108 @@ public void testFilterPushDown2Literal() {
   }
 
   @Test
-  public void testSqlParseNaN() {
-    // todo add some test case to test NaN
+  public void testFilterNaN() {
+    final String tableName = "test_table_nan";
+    try {
+      sql(
+          "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE, f FLOAT) WITH ('write.format.default'='%s')",
+          tableName, format.name());
+      sql(
+          "INSERT INTO %s VALUES (1,'iceberg',10, 1.1),(2,'b',20,2.2),(3,CAST(NULL AS VARCHAR),30,3.3),(4,'d',CAST('NaN' AS DOUBLE),4.4)",
+          tableName);
+
+      String sqlNaNDoubleEqual =
+          String.format("SELECT * FROM %s  WHERE d = CAST('NaN' AS DOUBLE)  ", tableName);
+      List<Row> resultDoubleEqual = sql(sqlNaNDoubleEqual);
+      Assert.assertEquals("Should have 0 records", 0, resultDoubleEqual.size());
+
+      String sqlNaNDoubleNotEqual =
+          String.format("SELECT * FROM %s  WHERE d <> CAST('NaN' AS DOUBLE)  ", tableName);
+      List<Row> resultDoubleNotEqual = sql(sqlNaNDoubleNotEqual);
+      List<Row> expectedDouble =
+          Lists.newArrayList(
+              Row.of(1, "iceberg", 10.0d, 1.1f),
+              Row.of(2, "b", 20.0d, 2.2f),
+              Row.of(3, null, 30.0d, 3.3f),
+              Row.of(4, "d", Double.NaN, 4.4f));

Review Comment:
   Why is this matching? The field is NaN.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6402: Flink: Add UT for NaN

Posted by GitBox <gi...@apache.org>.
hililiwei commented on code in PR #6402:
URL: https://github.com/apache/iceberg/pull/6402#discussion_r1048054508


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java:
##########
@@ -603,7 +605,103 @@ public void testFilterPushDown2Literal() {
   }
 
   @Test
-  public void testSqlParseNaN() {
-    // todo add some test case to test NaN
+  public void testFilterNaN() {
+    sql(
+        "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE, f FLOAT) WITH ('write.format.default'='%s')",

Review Comment:
   Delete the tests for FLOAT.NAN as well?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6402: Flink: Add UT for NaN

Posted by GitBox <gi...@apache.org>.
hililiwei commented on code in PR #6402:
URL: https://github.com/apache/iceberg/pull/6402#discussion_r1046618093


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java:
##########
@@ -603,7 +605,103 @@ public void testFilterPushDown2Literal() {
   }
 
   @Test
-  public void testSqlParseNaN() {
-    // todo add some test case to test NaN
+  public void testFilterNaN() {
+    sql(
+        "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE, f FLOAT) WITH ('write.format.default'='%s')",
+        TABLE_NAME_NAN, format.name());
+    sql(
+        "INSERT INTO %s VALUES (1,'iceberg',10, 1.1),(2,'b',20,2.2),(3,CAST(NULL AS VARCHAR),30,3.3),(4,'d',CAST('NaN' AS DOUBLE),4.4)",
+        TABLE_NAME_NAN);
+
+    String sqlNaNDoubleEqual =
+        String.format("SELECT * FROM %s  WHERE d = CAST('NaN' AS DOUBLE)  ", TABLE_NAME_NAN);
+    List<Row> resultDoubleEqual = sql(sqlNaNDoubleEqual);
+    Assert.assertEquals("Should have 0 records", 0, resultDoubleEqual.size());

Review Comment:
   Yes, I totally agree with you. I've been blocked here for a long time. Until a fellow database expert told me that NaN should not equal any value in IEEE 754, ref: https://en.wikipedia.org/wiki/NaN. Flink seems to follow this standard, while Iceberg doesn't.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6402: Flink: Add UT for NaN

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6402:
URL: https://github.com/apache/iceberg/pull/6402#discussion_r1050275269


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkFilters.java:
##########
@@ -246,18 +248,70 @@ private static Optional<Expression> convertFieldAndLiteral(
     org.apache.flink.table.expressions.Expression left = args.get(0);
     org.apache.flink.table.expressions.Expression right = args.get(1);
 
-    if (left instanceof FieldReferenceExpression && right instanceof ValueLiteralExpression) {
-      String name = ((FieldReferenceExpression) left).getName();
-      Optional<Object> lit = convertLiteral((ValueLiteralExpression) right);
+    Optional<Object> lit;
+    if (left instanceof FieldReferenceExpression) {
+      lit = convertExpression(right);
       if (lit.isPresent()) {
-        return Optional.of(convertLR.apply(name, lit.get()));
+        return Optional.of(convertLR.apply(((FieldReferenceExpression) left).getName(), lit.get()));

Review Comment:
   I don't see a reason to remove the `name` variable in this block or the mirror one. That introduces needless changes. Can you roll that change back?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6402: Flink: Add UT for NaN

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6402:
URL: https://github.com/apache/iceberg/pull/6402#discussion_r1045171171


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java:
##########
@@ -96,6 +97,7 @@ public void before() {
   @After
   public void clean() {
     sql("DROP TABLE IF EXISTS %s.%s", DATABASE_NAME, TABLE_NAME);
+    sql("DROP TABLE IF EXISTS %s.%s", DATABASE_NAME, TABLE_NAME_NAN);

Review Comment:
   might be better just do it with try-finally in the `testFilterNaN` method, since this is not applicable to other tests. save one sql run for other tests.



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java:
##########
@@ -603,7 +605,103 @@ public void testFilterPushDown2Literal() {
   }
 
   @Test
-  public void testSqlParseNaN() {
-    // todo add some test case to test NaN
+  public void testFilterNaN() {
+    sql(
+        "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE, f FLOAT) WITH ('write.format.default'='%s')",

Review Comment:
   probably not necessary to have the 4th `f FLOAT` column.



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java:
##########
@@ -603,7 +605,103 @@ public void testFilterPushDown2Literal() {
   }
 
   @Test
-  public void testSqlParseNaN() {
-    // todo add some test case to test NaN
+  public void testFilterNaN() {
+    sql(
+        "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE, f FLOAT) WITH ('write.format.default'='%s')",
+        TABLE_NAME_NAN, format.name());
+    sql(
+        "INSERT INTO %s VALUES (1,'iceberg',10, 1.1),(2,'b',20,2.2),(3,CAST(NULL AS VARCHAR),30,3.3),(4,'d',CAST('NaN' AS DOUBLE),4.4)",
+        TABLE_NAME_NAN);
+
+    String sqlNaNDoubleEqual =
+        String.format("SELECT * FROM %s  WHERE d = CAST('NaN' AS DOUBLE)  ", TABLE_NAME_NAN);
+    List<Row> resultDoubleEqual = sql(sqlNaNDoubleEqual);
+    Assert.assertEquals("Should have 0 records", 0, resultDoubleEqual.size());

Review Comment:
   In theory, this should have been 1. When I investigated this myself a few weeks ago, I ran into the problem. `NaN` where clause didn't work correctly. not sure where is the problem. 
   
   We can add some comments to explain the unexpected behavior and keep this test as it is for now. We can follow up on this with a separate PR, which requires deeper dig.



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java:
##########
@@ -603,7 +605,103 @@ public void testFilterPushDown2Literal() {
   }
 
   @Test
-  public void testSqlParseNaN() {
-    // todo add some test case to test NaN
+  public void testFilterNaN() {
+    sql(
+        "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE, f FLOAT) WITH ('write.format.default'='%s')",
+        TABLE_NAME_NAN, format.name());
+    sql(
+        "INSERT INTO %s VALUES (1,'iceberg',10, 1.1),(2,'b',20,2.2),(3,CAST(NULL AS VARCHAR),30,3.3),(4,'d',CAST('NaN' AS DOUBLE),4.4)",
+        TABLE_NAME_NAN);
+
+    String sqlNaNDoubleEqual =
+        String.format("SELECT * FROM %s  WHERE d = CAST('NaN' AS DOUBLE)  ", TABLE_NAME_NAN);
+    List<Row> resultDoubleEqual = sql(sqlNaNDoubleEqual);
+    Assert.assertEquals("Should have 0 records", 0, resultDoubleEqual.size());
+
+    String sqlNaNDoubleNotEqual =
+        String.format("SELECT * FROM %s  WHERE d <> CAST('NaN' AS DOUBLE)  ", TABLE_NAME_NAN);
+    List<Row> resultDoubleNotEqual = sql(sqlNaNDoubleNotEqual);
+    List<Row> expectedDouble =
+        Lists.newArrayList(
+            Row.of(1, "iceberg", 10.0d, 1.1f),
+            Row.of(2, "b", 20.0d, 2.2f),
+            Row.of(3, null, 30.0d, 3.3f),
+            Row.of(4, "d", Double.NaN, 4.4f));
+    Assert.assertEquals("Should have 4 records", 4, resultDoubleNotEqual.size());

Review Comment:
   same problem for `NaN` where clause. in theory, this should have been 3.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org