You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by gl...@apache.org on 2019/03/12 21:35:30 UTC

[beam] branch master updated: [BEAM-6810] Disable CalcRemoveRule

This is an automated email from the ASF dual-hosted git repository.

gleb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new aefbe14  [BEAM-6810] Disable CalcRemoveRule
     new 30633b2  Merge pull request #8035: [BEAM-6810] Disable CalcRemoveRule to fix trivial projections
aefbe14 is described below

commit aefbe1459a9e0715aaccdc419becc351250b27b8
Author: Gleb Kanterov <gl...@spotify.com>
AuthorDate: Tue Mar 12 11:56:55 2019 +0100

    [BEAM-6810] Disable CalcRemoveRule
    
    Trivial programs project precisely their input fields, without dropping
    or re-ordering them. It turns out that CalcRemoveRule eliminates them,
    even if there is a field aliasing. It results in an unexpected
    resulting schema of SQL query.
    
    It isn't clear whether it's an issue in Calcite or Beam rules, as a
    workaround we disable CalcRemoveRule.
---
 .../extensions/sql/impl/planner/BeamRuleSets.java  |  4 +--
 .../beam/sdk/extensions/sql/BeamSqlCliTest.java    |  8 +++--
 .../sdk/extensions/sql/BeamSqlDslProjectTest.java  | 39 ++++++++++++++++++----
 3 files changed, 41 insertions(+), 10 deletions(-)

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
index 51367c6..38cfcc9 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
@@ -40,7 +40,6 @@ import org.apache.calcite.rel.rules.AggregateProjectMergeRule;
 import org.apache.calcite.rel.rules.AggregateRemoveRule;
 import org.apache.calcite.rel.rules.AggregateUnionAggregateRule;
 import org.apache.calcite.rel.rules.CalcMergeRule;
-import org.apache.calcite.rel.rules.CalcRemoveRule;
 import org.apache.calcite.rel.rules.FilterAggregateTransposeRule;
 import org.apache.calcite.rel.rules.FilterCalcMergeRule;
 import org.apache.calcite.rel.rules.FilterJoinRule;
@@ -74,7 +73,8 @@ public class BeamRuleSets {
           ProjectCalcMergeRule.INSTANCE,
           FilterToCalcRule.INSTANCE,
           ProjectToCalcRule.INSTANCE,
-          CalcRemoveRule.INSTANCE,
+          // disabled due to https://issues.apache.org/jira/browse/BEAM-6810
+          // CalcRemoveRule.INSTANCE,
           CalcMergeRule.INSTANCE,
 
           // push a filter into a join
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
index 0774039..214c4fa 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
@@ -22,11 +22,11 @@ import static org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.INTEGER
 import static org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.VARCHAR;
 import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone;
 import static org.apache.beam.sdk.schemas.Schema.toSchema;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
 
 import java.util.stream.Stream;
 import org.apache.beam.sdk.extensions.sql.impl.ParseException;
@@ -232,7 +232,11 @@ public class BeamSqlCliTest {
             + "COMMENT '' LOCATION '/home/admin/orders'");
 
     String plan = cli.explainQuery("select * from person");
-    assertThat(plan, equalTo("BeamIOSourceRel(table=[[beam, person]])\n"));
+    assertThat(
+        plan,
+        equalTo(
+            "BeamCalcRel(expr#0..2=[{inputs}], proj#0..2=[{exprs}])\n"
+                + "  BeamIOSourceRel(table=[[beam, person]])\n"));
   }
 
   @Test
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
index ec7db20..068aeee 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
@@ -26,10 +26,12 @@ import java.util.stream.IntStream;
 import org.apache.beam.sdk.extensions.sql.impl.ParseException;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
+import org.junit.Assert;
 import org.junit.Test;
 
 /** Tests for field-project in queries with BOUNDED PCollection. */
@@ -53,7 +55,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
 
     PAssert.that(result).containsInAnyOrder(rowsInTableA.get(0));
 
-    pipeline.run().waitUntilFinish();
+    pipeline.run();
   }
 
   /** select partial fields with bounded PCollection. */
@@ -81,7 +83,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
 
     PAssert.that(result).containsInAnyOrder(row);
 
-    pipeline.run().waitUntilFinish();
+    pipeline.run();
   }
 
   /** select partial fields for multiple rows with bounded PCollection. */
@@ -110,7 +112,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
 
     PAssert.that(result).containsInAnyOrder(expectedRows);
 
-    pipeline.run().waitUntilFinish();
+    pipeline.run();
   }
 
   private Row rowAtIndex(Schema schema, int index) {
@@ -145,7 +147,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
 
     PAssert.that(result).containsInAnyOrder(expectedRows);
 
-    pipeline.run().waitUntilFinish();
+    pipeline.run();
   }
 
   /** select literal field with bounded PCollection. */
@@ -173,7 +175,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
 
     PAssert.that(result).containsInAnyOrder(row);
 
-    pipeline.run().waitUntilFinish();
+    pipeline.run();
   }
 
   @Test
@@ -188,6 +190,31 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
         PCollectionTuple.of(new TupleTag<>("TABLE_A"), boundedInput1)
             .apply("testProjectUnknownField", SqlTransform.query(sql));
 
-    pipeline.run().waitUntilFinish();
+    pipeline.run();
+  }
+
+  /**
+   * Trivial programs project precisely their input fields, without dropping or re-ordering them.
+   *
+   * @see <a href="https://issues.apache.org/jira/browse/BEAM-6810">BEAM-6810</a>
+   */
+  @Test
+  public void testTrivialProjection() {
+    String sql = "SELECT c_int64 as abc FROM PCOLLECTION";
+    Schema inputSchema = Schema.of(Schema.Field.of("c_int64", Schema.FieldType.INT64));
+    Schema outputSchema = Schema.of(Schema.Field.of("abc", Schema.FieldType.INT64));
+
+    PCollection<Row> input =
+        pipeline.apply(
+            Create.of(Row.withSchema(inputSchema).addValue(42L).build())
+                .withRowSchema(inputSchema));
+
+    PCollection<Row> result = input.apply(SqlTransform.query(sql));
+
+    Assert.assertEquals(outputSchema, result.getSchema());
+
+    PAssert.that(result).containsInAnyOrder(Row.withSchema(outputSchema).addValue(42L).build());
+
+    pipeline.run();
   }
 }