You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by gl...@apache.org on 2019/03/12 21:35:30 UTC
[beam] branch master updated: [BEAM-6810] Disable CalcRemoveRule
This is an automated email from the ASF dual-hosted git repository.
gleb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new aefbe14 [BEAM-6810] Disable CalcRemoveRule
new 30633b2 Merge pull request #8035: [BEAM-6810] Disable CalcRemoveRule to fix trivial projections
aefbe14 is described below
commit aefbe1459a9e0715aaccdc419becc351250b27b8
Author: Gleb Kanterov <gl...@spotify.com>
AuthorDate: Tue Mar 12 11:56:55 2019 +0100
[BEAM-6810] Disable CalcRemoveRule
Trivial programs project precisely their input fields, without dropping
or re-ordering them. It turns out that CalcRemoveRule eliminates them,
even if there is a field aliasing. It results in an unexpected
resulting schema of SQL query.
It isn't clear whether it's an issue in Calcite or Beam rules, as a
workaround we disable CalcRemoveRule.
---
.../extensions/sql/impl/planner/BeamRuleSets.java | 4 +--
.../beam/sdk/extensions/sql/BeamSqlCliTest.java | 8 +++--
.../sdk/extensions/sql/BeamSqlDslProjectTest.java | 39 ++++++++++++++++++----
3 files changed, 41 insertions(+), 10 deletions(-)
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
index 51367c6..38cfcc9 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
@@ -40,7 +40,6 @@ import org.apache.calcite.rel.rules.AggregateProjectMergeRule;
import org.apache.calcite.rel.rules.AggregateRemoveRule;
import org.apache.calcite.rel.rules.AggregateUnionAggregateRule;
import org.apache.calcite.rel.rules.CalcMergeRule;
-import org.apache.calcite.rel.rules.CalcRemoveRule;
import org.apache.calcite.rel.rules.FilterAggregateTransposeRule;
import org.apache.calcite.rel.rules.FilterCalcMergeRule;
import org.apache.calcite.rel.rules.FilterJoinRule;
@@ -74,7 +73,8 @@ public class BeamRuleSets {
ProjectCalcMergeRule.INSTANCE,
FilterToCalcRule.INSTANCE,
ProjectToCalcRule.INSTANCE,
- CalcRemoveRule.INSTANCE,
+ // disabled due to https://issues.apache.org/jira/browse/BEAM-6810
+ // CalcRemoveRule.INSTANCE,
CalcMergeRule.INSTANCE,
// push a filter into a join
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
index 0774039..214c4fa 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java
@@ -22,11 +22,11 @@ import static org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.INTEGER
import static org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.VARCHAR;
import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone;
import static org.apache.beam.sdk.schemas.Schema.toSchema;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
import java.util.stream.Stream;
import org.apache.beam.sdk.extensions.sql.impl.ParseException;
@@ -232,7 +232,11 @@ public class BeamSqlCliTest {
+ "COMMENT '' LOCATION '/home/admin/orders'");
String plan = cli.explainQuery("select * from person");
- assertThat(plan, equalTo("BeamIOSourceRel(table=[[beam, person]])\n"));
+ assertThat(
+ plan,
+ equalTo(
+ "BeamCalcRel(expr#0..2=[{inputs}], proj#0..2=[{exprs}])\n"
+ + " BeamIOSourceRel(table=[[beam, person]])\n"));
}
@Test
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
index ec7db20..068aeee 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
@@ -26,10 +26,12 @@ import java.util.stream.IntStream;
import org.apache.beam.sdk.extensions.sql.impl.ParseException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
+import org.junit.Assert;
import org.junit.Test;
/** Tests for field-project in queries with BOUNDED PCollection. */
@@ -53,7 +55,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
PAssert.that(result).containsInAnyOrder(rowsInTableA.get(0));
- pipeline.run().waitUntilFinish();
+ pipeline.run();
}
/** select partial fields with bounded PCollection. */
@@ -81,7 +83,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
PAssert.that(result).containsInAnyOrder(row);
- pipeline.run().waitUntilFinish();
+ pipeline.run();
}
/** select partial fields for multiple rows with bounded PCollection. */
@@ -110,7 +112,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
PAssert.that(result).containsInAnyOrder(expectedRows);
- pipeline.run().waitUntilFinish();
+ pipeline.run();
}
private Row rowAtIndex(Schema schema, int index) {
@@ -145,7 +147,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
PAssert.that(result).containsInAnyOrder(expectedRows);
- pipeline.run().waitUntilFinish();
+ pipeline.run();
}
/** select literal field with bounded PCollection. */
@@ -173,7 +175,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
PAssert.that(result).containsInAnyOrder(row);
- pipeline.run().waitUntilFinish();
+ pipeline.run();
}
@Test
@@ -188,6 +190,31 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
PCollectionTuple.of(new TupleTag<>("TABLE_A"), boundedInput1)
.apply("testProjectUnknownField", SqlTransform.query(sql));
- pipeline.run().waitUntilFinish();
+ pipeline.run();
+ }
+
+ /**
+ * Trivial programs project precisely their input fields, without dropping or re-ordering them.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/BEAM-6810">BEAM-6810</a>
+ */
+ @Test
+ public void testTrivialProjection() {
+ String sql = "SELECT c_int64 as abc FROM PCOLLECTION";
+ Schema inputSchema = Schema.of(Schema.Field.of("c_int64", Schema.FieldType.INT64));
+ Schema outputSchema = Schema.of(Schema.Field.of("abc", Schema.FieldType.INT64));
+
+ PCollection<Row> input =
+ pipeline.apply(
+ Create.of(Row.withSchema(inputSchema).addValue(42L).build())
+ .withRowSchema(inputSchema));
+
+ PCollection<Row> result = input.apply(SqlTransform.query(sql));
+
+ Assert.assertEquals(outputSchema, result.getSchema());
+
+ PAssert.that(result).containsInAnyOrder(Row.withSchema(outputSchema).addValue(42L).build());
+
+ pipeline.run();
}
}