You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/05/04 07:13:10 UTC
[2/3] beam git commit: redesign BeamSqlExpression to execute Calcite
SQL expression.
redesign BeamSqlExpression to execute Calcite SQL expression.
Changes:
1. revert BEAM dependency to 0.6.0 to avoid impact of changes in master branch;
2. updates as discussion during review;
refine BeamSqlRowCoder
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/464cc275
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/464cc275
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/464cc275
Branch: refs/heads/DSL_SQL
Commit: 464cc275952305bf51ecfdf056e784441c9c2272
Parents: aa07a1d
Author: mingmxu <mi...@ebay.com>
Authored: Sat Apr 22 22:20:25 2017 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Thu May 4 00:12:11 2017 -0700
----------------------------------------------------------------------
dsls/sql/pom.xml | 47 ++++++-
.../beam/dsls/sql/example/BeamSqlExample.java | 5 -
.../exception/BeamInvalidOperatorException.java | 34 +++++
.../exception/BeamSqlUnsupportedException.java | 34 +++++
.../sql/exception/InvalidFieldException.java | 34 +++++
.../beam/dsls/sql/exception/package-info.java | 23 +++
.../dsls/sql/interpreter/BeamSQLFnExecutor.java | 140 +++++++++++++++++++
.../sql/interpreter/BeamSQLSpELExecutor.java | 127 -----------------
.../dsls/sql/interpreter/CalciteToSpEL.java | 81 -----------
.../operator/BeamSqlAndExpression.java | 60 ++++++++
.../operator/BeamSqlCompareExpression.java | 94 +++++++++++++
.../operator/BeamSqlEqualExpression.java | 48 +++++++
.../interpreter/operator/BeamSqlExpression.java | 62 ++++++++
.../operator/BeamSqlInputRefExpression.java | 46 ++++++
.../operator/BeamSqlIsNotNullExpression.java | 51 +++++++
.../operator/BeamSqlIsNullExpression.java | 51 +++++++
.../BeamSqlLargerThanEqualExpression.java | 49 +++++++
.../operator/BeamSqlLargerThanExpression.java | 49 +++++++
.../BeamSqlLessThanEqualExpression.java | 49 +++++++
.../operator/BeamSqlLessThanExpression.java | 49 +++++++
.../operator/BeamSqlNotEqualExpression.java | 48 +++++++
.../operator/BeamSqlOrExpression.java | 60 ++++++++
.../interpreter/operator/BeamSqlPrimitive.java | 102 ++++++++++++++
.../sql/interpreter/operator/package-info.java | 22 +++
.../beam/dsls/sql/planner/BeamQueryPlanner.java | 8 +-
.../beam/dsls/sql/planner/BeamSqlRunner.java | 15 +-
.../planner/BeamSqlUnsupportedException.java | 38 -----
.../apache/beam/dsls/sql/rel/BeamFilterRel.java | 4 +-
.../beam/dsls/sql/rel/BeamProjectRel.java | 4 +-
.../beam/dsls/sql/schema/BaseBeamTable.java | 5 -
.../beam/dsls/sql/schema/BeamSQLRecordType.java | 4 -
.../dsls/sql/schema/BeamSQLRecordTypeCoder.java | 15 +-
.../apache/beam/dsls/sql/schema/BeamSQLRow.java | 76 ++++++----
.../beam/dsls/sql/schema/BeamSqlRowCoder.java | 48 ++++---
.../dsls/sql/schema/InvalidFieldException.java | 34 -----
.../sql/schema/kafka/BeamKafkaCSVTable.java | 5 -
.../dsls/sql/schema/kafka/BeamKafkaTable.java | 5 -
.../dsls/sql/transform/BeamSQLFilterFn.java | 4 -
.../sql/transform/BeamSQLOutputToConsoleFn.java | 4 -
.../dsls/sql/transform/BeamSQLProjectFn.java | 5 -
.../sql/interpreter/BeamSQLFnExecutorTest.java | 101 +++++++++++++
.../interpreter/BeamSQLFnExecutorTestBase.java | 91 ++++++++++++
.../operator/BeamNullExperssionTest.java | 53 +++++++
.../operator/BeamSqlAndOrExpressionTest.java | 59 ++++++++
.../operator/BeamSqlCompareExpressionTest.java | 108 ++++++++++++++
.../operator/BeamSqlInputRefExpressionTest.java | 58 ++++++++
.../operator/BeamSqlPrimitiveTest.java | 60 ++++++++
.../beam/dsls/sql/planner/BasePlanner.java | 28 +++-
.../sql/planner/BeamPlannerExplainTest.java | 5 +-
.../dsls/sql/planner/BeamPlannerSubmitTest.java | 7 +-
.../dsls/sql/planner/MockedBeamSQLTable.java | 14 +-
51 files changed, 1803 insertions(+), 420 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/pom.xml
----------------------------------------------------------------------
diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml
index 21c8def..e2f09be 100644
--- a/dsls/sql/pom.xml
+++ b/dsls/sql/pom.xml
@@ -116,7 +116,42 @@
</plugin>
</plugins>
</build>
-
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <version>0.6.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>0.6.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-kafka</artifactId>
+ <version>0.6.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-core-java</artifactId>
+ <version>0.6.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-common-runner-api</artifactId>
+ <version>0.6.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-core-construction-java</artifactId>
+ <version>0.6.0</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
<dependencies>
<dependency>
<groupId>junit</groupId>
@@ -130,6 +165,12 @@
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-lite</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
@@ -142,10 +183,6 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-expression</artifactId>
- </dependency>
- <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
index d32bc59..303835f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
@@ -66,11 +66,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
*/
public class BeamSqlExample implements Serializable {
- /**
- *
- */
- private static final long serialVersionUID = 3673487843555563904L;
-
public static void main(String[] args) throws Exception {
BeamSqlRunner runner = new BeamSqlRunner();
runner.addTable("ORDER_DETAILS", getTable("127.0.0.1:9092", "orders"));
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamInvalidOperatorException.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamInvalidOperatorException.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamInvalidOperatorException.java
new file mode 100644
index 0000000..281ef89
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamInvalidOperatorException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.exception;
+
+/**
+ * operation is not supported.
+ *
+ */
+public class BeamInvalidOperatorException extends RuntimeException {
+
+ public BeamInvalidOperatorException(String string) {
+ super(string);
+ }
+
+ public BeamInvalidOperatorException() {
+ super();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamSqlUnsupportedException.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamSqlUnsupportedException.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamSqlUnsupportedException.java
new file mode 100644
index 0000000..02e843b
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/BeamSqlUnsupportedException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.exception;
+
+/**
+ * Generic exception for un-supported features/functions in BeamSQL.
+ *
+ */
+public class BeamSqlUnsupportedException extends RuntimeException {
+
+ public BeamSqlUnsupportedException(String string) {
+ super(string);
+ }
+
+ public BeamSqlUnsupportedException() {
+ super();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/InvalidFieldException.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/InvalidFieldException.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/InvalidFieldException.java
new file mode 100644
index 0000000..82ebabe
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/InvalidFieldException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.exception;
+
+/**
+ * Exception when the field value and field type is not compatible.
+ *
+ */
+public class InvalidFieldException extends RuntimeException {
+
+ public InvalidFieldException() {
+ super();
+ }
+
+ public InvalidFieldException(String message) {
+ super(message);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/package-info.java
new file mode 100644
index 0000000..619100c
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/exception/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Exceptions in BeamSQL.
+ *
+ */
+package org.apache.beam.dsls.sql.exception;
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
new file mode 100644
index 0000000..32e2ffc
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlAndExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlIsNotNullExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlIsNullExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLargerThanEqualExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLargerThanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanEqualExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlNotEqualExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlOrExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.rel.BeamFilterRel;
+import org.apache.beam.dsls.sql.rel.BeamProjectRel;
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+
+/**
+ * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}.
+ * {@code BeamSQLFnExecutor} converts a {@link BeamRelNode} to a {@link BeamSqlExpression},
+ * which can be evaluated against the {@link BeamSQLRow}.
+ *
+ */
+public class BeamSQLFnExecutor implements BeamSQLExpressionExecutor {
+ protected List<BeamSqlExpression> exps;
+
+ public BeamSQLFnExecutor(BeamRelNode relNode) {
+ this.exps = new ArrayList<>();
+ if (relNode instanceof BeamFilterRel) {
+ BeamFilterRel filterNode = (BeamFilterRel) relNode;
+ RexNode condition = filterNode.getCondition();
+ exps.add(buildExpression(condition));
+ } else if (relNode instanceof BeamProjectRel) {
+ BeamProjectRel projectNode = (BeamProjectRel) relNode;
+ List<RexNode> projects = projectNode.getProjects();
+ for (RexNode rexNode : projects) {
+ exps.add(buildExpression(rexNode));
+ }
+ } else {
+ throw new BeamSqlUnsupportedException(
+ String.format("%s is not supported yet", relNode.getClass().toString()));
+ }
+ }
+
+ /**
+ * {@link #buildExpression(RexNode)} visits the operands of {@link RexNode} recursively,
+ * and represent each {@link SqlOperator} with a corresponding {@link BeamSqlExpression}.
+ */
+ private BeamSqlExpression buildExpression(RexNode rexNode) {
+ if (rexNode instanceof RexLiteral) {
+ RexLiteral node = (RexLiteral) rexNode;
+ return BeamSqlPrimitive.of(node.getTypeName(), node.getValue());
+ } else if (rexNode instanceof RexInputRef) {
+ RexInputRef node = (RexInputRef) rexNode;
+ return new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex());
+ } else if (rexNode instanceof RexCall) {
+ RexCall node = (RexCall) rexNode;
+ String opName = node.op.getName();
+ List<BeamSqlExpression> subExps = new ArrayList<>();
+ for (RexNode subNode : node.operands) {
+ subExps.add(buildExpression(subNode));
+ }
+ switch (opName) {
+ case "AND":
+ return new BeamSqlAndExpression(subExps);
+ case "OR":
+ return new BeamSqlOrExpression(subExps);
+
+ case "=":
+ return new BeamSqlEqualExpression(subExps);
+ case "<>=":
+ return new BeamSqlNotEqualExpression(subExps);
+ case ">":
+ return new BeamSqlLargerThanExpression(subExps);
+ case ">=":
+ return new BeamSqlLargerThanEqualExpression(subExps);
+ case "<":
+ return new BeamSqlLessThanExpression(subExps);
+ case "<=":
+ return new BeamSqlLessThanEqualExpression(subExps);
+
+ case "IS NULL":
+ return new BeamSqlIsNullExpression(subExps.get(0));
+ case "IS NOT NULL":
+ return new BeamSqlIsNotNullExpression(subExps.get(0));
+ default:
+ throw new BeamSqlUnsupportedException();
+ }
+ } else {
+ throw new BeamSqlUnsupportedException(
+ String.format("%s is not supported yet", rexNode.getClass().toString()));
+ }
+ }
+
+ @Override
+ public void prepare() {
+ }
+
+ @Override
+ public List<Object> execute(BeamSQLRow inputRecord) {
+ List<Object> results = new ArrayList<>();
+ for (BeamSqlExpression exp : exps) {
+ results.add(exp.evaluate(inputRecord).getValue());
+ }
+ return results;
+ }
+
+ @Override
+ public void close() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java
deleted file mode 100644
index 9c9c37f..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.interpreter;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.beam.dsls.sql.planner.BeamSqlUnsupportedException;
-import org.apache.beam.dsls.sql.rel.BeamFilterRel;
-import org.apache.beam.dsls.sql.rel.BeamProjectRel;
-import org.apache.beam.dsls.sql.rel.BeamRelNode;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.springframework.expression.Expression;
-import org.springframework.expression.ExpressionParser;
-import org.springframework.expression.spel.SpelParserConfiguration;
-import org.springframework.expression.spel.standard.SpelExpressionParser;
-import org.springframework.expression.spel.support.StandardEvaluationContext;
-
-/**
- * {@code BeamSQLSpELExecutor} is one implementation, to convert Calcite SQL
- * relational expression to SpEL expression.
- *
- */
-public class BeamSQLSpELExecutor implements BeamSQLExpressionExecutor {
- /**
- *
- */
- private static final long serialVersionUID = 6777232573390074408L;
-
- private List<String> spelString;
- private List<Expression> spelExpressions;
-
- public BeamSQLSpELExecutor(BeamRelNode relNode) {
- this.spelString = new ArrayList<>();
- if (relNode instanceof BeamFilterRel) {
- String filterSpEL = CalciteToSpEL
- .rexcall2SpEL((RexCall) ((BeamFilterRel) relNode).getCondition());
- spelString.add(filterSpEL);
- } else if (relNode instanceof BeamProjectRel) {
- spelString.addAll(createProjectExps((BeamProjectRel) relNode));
- // List<ProjectRule> projectRules =
- // for (int idx = 0; idx < projectRules.size(); ++idx) {
- // spelString.add(projectRules.get(idx).getProjectExp());
- // }
- } else {
- throw new BeamSqlUnsupportedException(
- String.format("%s is not supported yet", relNode.getClass().toString()));
- }
- }
-
- @Override
- public void prepare() {
- this.spelExpressions = new ArrayList<>();
-
- SpelParserConfiguration config = new SpelParserConfiguration(true, true);
- ExpressionParser parser = new SpelExpressionParser(config);
- for (String el : spelString) {
- spelExpressions.add(parser.parseExpression(el));
- }
- }
-
- @Override
- public List<Object> execute(BeamSQLRow inputRecord) {
- StandardEvaluationContext inContext = new StandardEvaluationContext();
- inContext.setVariable("in", inputRecord);
-
- List<Object> results = new ArrayList<>();
- for (Expression ep : spelExpressions) {
- results.add(ep.getValue(inContext));
- }
- return results;
- }
-
- @Override
- public void close() {
-
- }
-
- private List<String> createProjectExps(BeamProjectRel projectRel) {
- List<String> rules = new ArrayList<>();
-
- List<RexNode> exps = projectRel.getProjects();
-
- for (int idx = 0; idx < exps.size(); ++idx) {
- RexNode node = exps.get(idx);
- if (node == null) {
- rules.add("null");
- }
-
- if (node instanceof RexLiteral) {
- rules.add(((RexLiteral) node).getValue() + "");
- } else {
- if (node instanceof RexInputRef) {
- rules.add("#in.getFieldValue(" + ((RexInputRef) node).getIndex() + ")");
- }
- if (node instanceof RexCall) {
- rules.add(CalciteToSpEL.rexcall2SpEL((RexCall) node));
- }
- }
- }
-
- checkArgument(rules.size() == exps.size(), "missing projects rules after conversion.");
-
- return rules;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/CalciteToSpEL.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/CalciteToSpEL.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/CalciteToSpEL.java
deleted file mode 100644
index 6cdc31b..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/CalciteToSpEL.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.interpreter;
-
-import com.google.common.base.Joiner;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.beam.dsls.sql.planner.BeamSqlUnsupportedException;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexNode;
-
-/**
- * {@code CalciteToSpEL} is used in {@link BeamSQLSpELExecutor}, to convert a
- * relational expression {@link RexCall} to SpEL expression.
- *
- */
-public class CalciteToSpEL {
-
- public static String rexcall2SpEL(RexCall cdn) {
- List<String> parts = new ArrayList<>();
- for (RexNode subcdn : cdn.operands) {
- if (subcdn instanceof RexCall) {
- parts.add(rexcall2SpEL((RexCall) subcdn));
- } else {
- parts.add(subcdn instanceof RexInputRef
- ? "#in.getFieldValue(" + ((RexInputRef) subcdn).getIndex() + ")" : subcdn.toString());
- }
- }
-
- String opName = cdn.op.getName();
- switch (cdn.op.getClass().getSimpleName()) {
- case "SqlMonotonicBinaryOperator": // +-*
- case "SqlBinaryOperator": // > < = >= <= <> OR AND || / .
- switch (cdn.op.getName().toUpperCase()) {
- case "AND":
- return String.format(" ( %s ) ", Joiner.on("&&").join(parts));
- case "OR":
- return String.format(" ( %s ) ", Joiner.on("||").join(parts));
- case "=":
- return String.format(" ( %s ) ", Joiner.on("==").join(parts));
- case "<>":
- return String.format(" ( %s ) ", Joiner.on("!=").join(parts));
- default:
- return String.format(" ( %s ) ", Joiner.on(cdn.op.getName().toUpperCase()).join(parts));
- }
- case "SqlCaseOperator": // CASE
- return String.format(" (%s ? %s : %s)", parts.get(0), parts.get(1), parts.get(2));
- case "SqlCastFunction": // CAST
- return parts.get(0);
- case "SqlPostfixOperator":
- switch (opName.toUpperCase()) {
- case "IS NULL":
- return String.format(" null == %s ", parts.get(0));
- case "IS NOT NULL":
- return String.format(" null != %s ", parts.get(0));
- default:
- throw new BeamSqlUnsupportedException();
- }
- default:
- throw new BeamSqlUnsupportedException();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java
new file mode 100644
index 0000000..55473b5
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'AND' operation.
+ */
+public class BeamSqlAndExpression extends BeamSqlExpression {
+
+ private BeamSqlAndExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+ public BeamSqlAndExpression(List<BeamSqlExpression> operands) {
+ this(operands, SqlTypeName.BOOLEAN);
+ }
+
+ @Override
+ public boolean accept() {
+ for (BeamSqlExpression exp : operands) {
+ // only accept BOOLEAN expression as operand
+ if (!exp.outputType.equals(SqlTypeName.BOOLEAN)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) {
+ boolean result = true;
+ for (BeamSqlExpression exp : operands) {
+ BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord);
+ result = result && expOut.getValue();
+ if (!result) {
+ break;
+ }
+ }
+ return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, result);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
new file mode 100644
index 0000000..bfb798d
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@link BeamSqlCompareExpression} is used for compare operations.
+ *
+ * <p>See {@link BeamSqlEqualExpression}, {@link BeamSqlLessThanExpression},
+ * {@link BeamSqlLessThanEqualExpression}, {@link BeamSqlLargerThanExpression},
+ * {@link BeamSqlLargerThanEqualExpression} and {@link BeamSqlNotEqualExpression} for more details.
+ *
+ */
+public abstract class BeamSqlCompareExpression extends BeamSqlExpression {
+
+ private BeamSqlCompareExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ public BeamSqlCompareExpression(List<BeamSqlExpression> operands) {
+ this(operands, SqlTypeName.BOOLEAN);
+ }
+
+ /**
+ * Compare operation must have 2 operands.
+ */
+ @Override
+ public boolean accept() {
+ return operands.size() == 2;
+ }
+
+ @Override
+ public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) {
+ Object leftValue = operands.get(0).evaluate(inputRecord).getValue();
+ Object rightValue = operands.get(1).evaluate(inputRecord).getValue();
+ switch (operands.get(0).outputType) {
+ case BIGINT:
+ case DECIMAL:
+ case DOUBLE:
+ case FLOAT:
+ case INTEGER:
+ case SMALLINT:
+ case TINYINT:
+ return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
+ compare((Number) leftValue, (Number) rightValue));
+ case BOOLEAN:
+ return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
+ compare((Boolean) leftValue, (Boolean) rightValue));
+ case VARCHAR:
+ return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN,
+ compare((CharSequence) leftValue, (CharSequence) rightValue));
+ default:
+ throw new BeamSqlUnsupportedException(toString());
+ }
+ }
+
+ /**
+ * Compare between String values, mapping to {@link SqlTypeName#VARCHAR}.
+ */
+ public abstract Boolean compare(CharSequence leftValue, CharSequence rightValue);
+
+ /**
+ * Compare between Boolean values, mapping to {@link SqlTypeName#BOOLEAN}.
+ */
+ public abstract Boolean compare(Boolean leftValue, Boolean rightValue);
+
+ /**
+ * Compare between Number values, including {@link SqlTypeName#BIGINT},
+ * {@link SqlTypeName#DECIMAL}, {@link SqlTypeName#DOUBLE}, {@link SqlTypeName#FLOAT},
+ * {@link SqlTypeName#INTEGER}, {@link SqlTypeName#SMALLINT} and {@link SqlTypeName#TINYINT}.
+ */
+ public abstract Boolean compare(Number leftValue, Number rightValue);
+
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlEqualExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlEqualExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlEqualExpression.java
new file mode 100644
index 0000000..4bc487b
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlEqualExpression.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.List;
+
+/**
+ * {@code BeamSqlExpression} for {@code =} operation.
+ */
+public class BeamSqlEqualExpression extends BeamSqlCompareExpression {
+
+ public BeamSqlEqualExpression(List<BeamSqlExpression> operands) {
+ super(operands);
+ }
+
+ @Override
+ public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+ return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) == 0;
+ }
+
+ @Override
+ public Boolean compare(Boolean leftValue, Boolean rightValue) {
+ return !(leftValue ^ rightValue);
+ }
+
+ @Override
+ public Boolean compare(Number leftValue, Number rightValue) {
+ return (leftValue == null && rightValue == null)
+ || (leftValue != null && rightValue != null
+ && leftValue.floatValue() == (rightValue).floatValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
new file mode 100644
index 0000000..c44795f
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} is an equivalent expression in BeamSQL, of {@link RexNode} in Calcite.
+ *
+ * <p>An implementation of {@link BeamSqlExpression} takes one or more {@code BeamSqlExpression}
+ * as its operands, and return a value with type {@link SqlTypeName}.
+ *
+ */
+public abstract class BeamSqlExpression implements Serializable{
+ protected List<BeamSqlExpression> operands;
+ protected SqlTypeName outputType;
+
+ protected BeamSqlExpression(){}
+
+ public BeamSqlExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ this.operands = operands;
+ this.outputType = outputType;
+ }
+
+ /**
+ * assertion to make sure the input and output are supported in this expression.
+ */
+ public abstract boolean accept();
+
+ /**
+ * Apply input record {@link BeamSQLRow} to this expression,
+ * the output value is wrapped with {@link BeamSqlPrimitive}.
+ */
+ public abstract BeamSqlPrimitive evaluate(BeamSQLRow inputRecord);
+
+ public List<BeamSqlExpression> getOperands() {
+ return operands;
+ }
+
+ public SqlTypeName getOutputType() {
+ return outputType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
new file mode 100644
index 0000000..612108f
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * An primitive operation for direct field extraction.
+ */
+public class BeamSqlInputRefExpression extends BeamSqlExpression{
+ private int inputRef;
+
+ public BeamSqlInputRefExpression(SqlTypeName sqlTypeName, int inputRef) {
+ super(null, sqlTypeName);
+ this.inputRef = inputRef;
+ }
+
+ @Override
+ public boolean accept() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+ return BeamSqlPrimitive.of(outputType, inputRecord.getFieldValue(inputRef));
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
new file mode 100644
index 0000000..784584e
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'IS NOT NULL' operation.
+ */
+public class BeamSqlIsNotNullExpression extends BeamSqlExpression {
+
+ private BeamSqlIsNotNullExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ public BeamSqlIsNotNullExpression(BeamSqlExpression operand){
+ this(Arrays.asList(operand), SqlTypeName.BOOLEAN);
+ }
+
+ /**
+ * only one operand is required.
+ */
+ @Override
+ public boolean accept() {
+ return operands.size() == 1;
+ }
+
+ @Override
+ public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) {
+ Object leftValue = operands.get(0).evaluate(inputRecord).getValue();
+ return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue != null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
new file mode 100644
index 0000000..b09ddbf
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'IS NULL' operation.
+ */
+public class BeamSqlIsNullExpression extends BeamSqlExpression {
+
+ private BeamSqlIsNullExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ public BeamSqlIsNullExpression(BeamSqlExpression operand){
+ this(Arrays.asList(operand), SqlTypeName.BOOLEAN);
+ }
+
+ /**
+ * only one operand is required.
+ */
+ @Override
+ public boolean accept() {
+ return operands.size() == 1;
+ }
+
+ @Override
+ public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) {
+ Object leftValue = operands.get(0).evaluate(inputRecord).getValue();
+ return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue == null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanEqualExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanEqualExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanEqualExpression.java
new file mode 100644
index 0000000..d78c020
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanEqualExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
+
+/**
+ * {@code BeamSqlExpression} for {@code >=} operation.
+ */
+public class BeamSqlLargerThanEqualExpression extends BeamSqlCompareExpression {
+
+ public BeamSqlLargerThanEqualExpression(List<BeamSqlExpression> operands) {
+ super(operands);
+ }
+
+ @Override
+ public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+ return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) >= 0;
+ }
+
+ @Override
+ public Boolean compare(Boolean leftValue, Boolean rightValue) {
+ throw new BeamInvalidOperatorException(">= is not supported for Boolean.");
+ }
+
+ @Override
+ public Boolean compare(Number leftValue, Number rightValue) {
+ return (leftValue == null && rightValue == null)
+ || (leftValue != null && rightValue != null
+ && leftValue.floatValue() >= (rightValue).floatValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanExpression.java
new file mode 100644
index 0000000..0b0d6f1
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLargerThanExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
+
+/**
+ * {@code BeamSqlExpression} for {@code >} operation.
+ */
+public class BeamSqlLargerThanExpression extends BeamSqlCompareExpression {
+
+ public BeamSqlLargerThanExpression(List<BeamSqlExpression> operands) {
+ super(operands);
+ }
+
+ @Override
+ public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+ return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) > 0;
+ }
+
+ @Override
+ public Boolean compare(Boolean leftValue, Boolean rightValue) {
+ throw new BeamInvalidOperatorException("> is not supported for Boolean.");
+ }
+
+ @Override
+ public Boolean compare(Number leftValue, Number rightValue) {
+ return (leftValue == null && rightValue == null)
+ || (leftValue != null && rightValue != null
+ && leftValue.floatValue() > (rightValue).floatValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanEqualExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanEqualExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanEqualExpression.java
new file mode 100644
index 0000000..b6f7c9a
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanEqualExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
+
+/**
+ * {@code BeamSqlExpression} for {@code <=} operation.
+ */
+public class BeamSqlLessThanEqualExpression extends BeamSqlCompareExpression {
+
+ public BeamSqlLessThanEqualExpression(List<BeamSqlExpression> operands) {
+ super(operands);
+ }
+
+ @Override
+ public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+ return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) <= 0;
+ }
+
+ @Override
+ public Boolean compare(Boolean leftValue, Boolean rightValue) {
+ throw new BeamInvalidOperatorException("<= is not supported for Boolean.");
+ }
+
+ @Override
+ public Boolean compare(Number leftValue, Number rightValue) {
+ return (leftValue == null && rightValue == null)
+ || (leftValue != null && rightValue != null
+ && leftValue.floatValue() <= (rightValue).floatValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanExpression.java
new file mode 100644
index 0000000..216a621
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlLessThanExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
+
+/**
+ * {@code BeamSqlExpression} for {@code <} operation.
+ */
+public class BeamSqlLessThanExpression extends BeamSqlCompareExpression {
+
+ public BeamSqlLessThanExpression(List<BeamSqlExpression> operands) {
+ super(operands);
+ }
+
+ @Override
+ public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+ return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) < 0;
+ }
+
+ @Override
+ public Boolean compare(Boolean leftValue, Boolean rightValue) {
+ throw new BeamInvalidOperatorException("< is not supported for Boolean.");
+ }
+
+ @Override
+ public Boolean compare(Number leftValue, Number rightValue) {
+ return (leftValue == null && rightValue == null)
+ || (leftValue != null && rightValue != null
+ && leftValue.floatValue() < (rightValue).floatValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlNotEqualExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlNotEqualExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlNotEqualExpression.java
new file mode 100644
index 0000000..2b093bf
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlNotEqualExpression.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.List;
+
+/**
+ * {@code BeamSqlExpression} for {@code <>} operation.
+ */
+public class BeamSqlNotEqualExpression extends BeamSqlCompareExpression {
+
+ public BeamSqlNotEqualExpression(List<BeamSqlExpression> operands) {
+ super(operands);
+ }
+
+ @Override
+ public Boolean compare(CharSequence leftValue, CharSequence rightValue) {
+ return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) != 0;
+ }
+
+ @Override
+ public Boolean compare(Boolean leftValue, Boolean rightValue) {
+ return leftValue ^ rightValue;
+ }
+
+ @Override
+ public Boolean compare(Number leftValue, Number rightValue) {
+ return (leftValue == null && rightValue == null)
+ || (leftValue != null && rightValue != null
+ && leftValue.floatValue() != (rightValue).floatValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java
new file mode 100644
index 0000000..4d07af8
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'OR' operation.
+ */
+public class BeamSqlOrExpression extends BeamSqlExpression {
+
+ private BeamSqlOrExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+ public BeamSqlOrExpression(List<BeamSqlExpression> operands) {
+ this(operands, SqlTypeName.BOOLEAN);
+ }
+
+ @Override
+ public boolean accept() {
+ for (BeamSqlExpression exp : operands) {
+ // only accept BOOLEAN expression as operand
+ if (!exp.outputType.equals(SqlTypeName.BOOLEAN)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) {
+ boolean result = false;
+ for (BeamSqlExpression exp : operands) {
+ BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord);
+ result = result || expOut.getValue();
+ if (result) {
+ break;
+ }
+ }
+ return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, result);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
new file mode 100644
index 0000000..71852ff
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
+import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}.
+ * It holds the value, and return it directly during {@link #evaluate(BeamSQLRow)}.
+ *
+ */
+public class BeamSqlPrimitive<T> extends BeamSqlExpression{
+ private SqlTypeName outputType;
+ private T value;
+
+ private BeamSqlPrimitive() {
+ }
+
+ private BeamSqlPrimitive(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+ super(operands, outputType);
+ }
+
+ /**
+ * A builder function to create from Type and value directly.
+ */
+ public static <T> BeamSqlPrimitive<T> of(SqlTypeName outputType, T value){
+ BeamSqlPrimitive<T> exp = new BeamSqlPrimitive<T>();
+ exp.outputType = outputType;
+ exp.value = value;
+ if (!exp.accept()) {
+ throw new BeamInvalidOperatorException(
+ String.format("value [%s] doesn't match type [%s].", value, outputType));
+ }
+ return exp;
+ }
+
+ public SqlTypeName getOutputType() {
+ return outputType;
+ }
+
+ public T getValue() {
+ return value;
+ }
+
+ @Override
+ public boolean accept() {
+ if (value == null) {
+ return true;
+ }
+
+ switch (outputType) {
+ case BIGINT:
+ return value instanceof Long;
+ case DECIMAL:
+ return value instanceof BigDecimal;
+ case DOUBLE:
+ return value instanceof Double;
+ case FLOAT:
+ return value instanceof Float;
+ case INTEGER:
+ return value instanceof Integer;
+ case SMALLINT:
+ return value instanceof Short;
+ case TINYINT:
+ return value instanceof Byte;
+ case BOOLEAN:
+ return value instanceof Boolean;
+ case CHAR:
+ return value instanceof Character;
+ case VARCHAR:
+ return value instanceof String;
+ default:
+ throw new BeamSqlUnsupportedException(outputType.name());
+ }
+ }
+
+ @Override
+ public BeamSqlPrimitive<T> evaluate(BeamSQLRow inputRecord) {
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/package-info.java
new file mode 100644
index 0000000..9b0a9a7
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation for operators in {@link org.apache.calcite.sql.fun.SqlStdOperatorTable}.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
index aac86d6..935dae7 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
@@ -122,7 +122,13 @@ public class BeamQueryPlanner {
*/
public BeamRelNode convertToBeamRel(String sqlStatement)
throws ValidationException, RelConversionException, SqlParseException {
- return (BeamRelNode) validateAndConvert(planner.parse(sqlStatement));
+ BeamRelNode beamRelNode;
+ try {
+ beamRelNode = (BeamRelNode) validateAndConvert(planner.parse(sqlStatement));
+ } finally {
+ planner.close();
+ }
+ return beamRelNode;
}
private RelNode validateAndConvert(SqlNode sqlNode)
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java
index e457e80..708c507 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java
@@ -36,17 +36,17 @@ import org.slf4j.LoggerFactory;
*
*/
public class BeamSqlRunner implements Serializable {
- /**
- *
- */
- private static final long serialVersionUID = -4708693435115005182L;
-
private static final Logger LOG = LoggerFactory.getLogger(BeamSqlRunner.class);
private SchemaPlus schema = Frameworks.createRootSchema(true);
private BeamQueryPlanner planner = new BeamQueryPlanner(schema);
+ public BeamSqlRunner() {
+ //disable assertions in Calcite.
+ ClassLoader.getSystemClassLoader().setDefaultAssertionStatus(false);
+ }
+
/**
* Add a schema.
*
@@ -70,7 +70,6 @@ public class BeamSqlRunner implements Serializable {
*/
public void submitQuery(String sqlString) throws Exception {
planner.submitToRun(sqlString);
- planner.planner.close();
}
/**
@@ -78,12 +77,10 @@ public class BeamSqlRunner implements Serializable {
*
*/
public String explainQuery(String sqlString)
- throws ValidationException, RelConversionException, SqlParseException {
+ throws ValidationException, RelConversionException, SqlParseException {
BeamRelNode exeTree = planner.convertToBeamRel(sqlString);
String beamPlan = RelOptUtil.toString(exeTree);
System.out.println(String.format("beamPlan>\n%s", beamPlan));
-
- planner.planner.close();
return beamPlan;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlUnsupportedException.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlUnsupportedException.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlUnsupportedException.java
deleted file mode 100644
index 7cb5243..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlUnsupportedException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.planner;
-
-/**
- * Generic exception for un-supported operations.
- *
- */
-public class BeamSqlUnsupportedException extends RuntimeException {
- /**
- *
- */
- private static final long serialVersionUID = 3445015747629217342L;
-
- public BeamSqlUnsupportedException(String string) {
- super(string);
- }
-
- public BeamSqlUnsupportedException() {
- super();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
index 10dd1be..477be5a 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
@@ -18,7 +18,7 @@
package org.apache.beam.dsls.sql.rel;
import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
-import org.apache.beam.dsls.sql.interpreter.BeamSQLSpELExecutor;
+import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutor;
import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
@@ -58,7 +58,7 @@ public class BeamFilterRel extends Filter implements BeamRelNode {
PCollection<BeamSQLRow> upstream = planCreator.getLatestStream();
- BeamSQLExpressionExecutor executor = new BeamSQLSpELExecutor(this);
+ BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this);
PCollection<BeamSQLRow> projectStream = upstream.apply(stageName,
ParDo.of(new BeamSQLFilterFn(getRelTypeName(), executor)));
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
index dd731f8..7e27ab3 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
@@ -20,7 +20,7 @@ package org.apache.beam.dsls.sql.rel;
import java.util.List;
import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
-import org.apache.beam.dsls.sql.interpreter.BeamSQLSpELExecutor;
+import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutor;
import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
@@ -69,7 +69,7 @@ public class BeamProjectRel extends Project implements BeamRelNode {
PCollection<BeamSQLRow> upstream = planCreator.getLatestStream();
- BeamSQLExpressionExecutor executor = new BeamSQLSpELExecutor(this);
+ BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this);
PCollection<BeamSQLRow> projectStream = upstream.apply(stageName, ParDo
.of(new BeamSQLProjectFn(getRelTypeName(), executor, BeamSQLRecordType.from(rowType))));
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
index 81829e9..2ecfa38 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
@@ -37,11 +37,6 @@ import org.apache.calcite.schema.Statistics;
* Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
*/
public abstract class BaseBeamTable implements ScannableTable, Serializable {
-
- /**
- *
- */
- private static final long serialVersionUID = -1262988061830914193L;
private RelDataType relDataType;
protected BeamSQLRecordType beamSqlRecordType;
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
index 661b155..e4013bc 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
@@ -30,10 +30,6 @@ import org.apache.calcite.sql.type.SqlTypeName;
*/
//@DefaultCoder(BeamSQLRecordTypeCoder.class)
public class BeamSQLRecordType implements Serializable {
- /**
- *
- */
- private static final long serialVersionUID = -5318734648766104712L;
private List<String> fieldsName = new ArrayList<>();
private List<SqlTypeName> fieldsType = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
index ec330f1..b88a195 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
@@ -54,35 +54,34 @@ public class BeamSQLRecordTypeCoder extends StandardCoder<BeamSQLRecordType> {
for (SqlTypeName fieldType : value.getFieldsType()) {
stringCoder.encode(fieldType.name(), outStream, nested);
}
- outStream.flush();
+ //add a dummy field to indicate the end of record
+ intCoder.encode(value.size(), outStream, context);
}
@Override
public BeamSQLRecordType decode(InputStream inStream,
org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException {
BeamSQLRecordType typeRecord = new BeamSQLRecordType();
- Context nested = context.nested();
- int size = intCoder.decode(inStream, nested);
+ int size = intCoder.decode(inStream, context.nested());
for (int idx = 0; idx < size; ++idx) {
- typeRecord.getFieldsName().add(stringCoder.decode(inStream, nested));
+ typeRecord.getFieldsName().add(stringCoder.decode(inStream, context.nested()));
}
for (int idx = 0; idx < size; ++idx) {
- typeRecord.getFieldsType().add(SqlTypeName.valueOf(stringCoder.decode(inStream, nested)));
+ typeRecord.getFieldsType().add(
+ SqlTypeName.valueOf(stringCoder.decode(inStream, context.nested())));
}
+ intCoder.decode(inStream, context);
return typeRecord;
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
- // TODO Auto-generated method stub
return null;
}
@Override
public void verifyDeterministic()
throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
- // TODO Auto-generated method stub
-
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
index b65e23b..f9dab8a 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+import org.apache.beam.dsls.sql.exception.InvalidFieldException;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -28,10 +29,6 @@ import org.apache.calcite.sql.type.SqlTypeName;
*
*/
public class BeamSQLRow implements Serializable {
- /**
- *
- */
- private static final long serialVersionUID = 4569220242480160895L;
private List<Integer> nullFields = new ArrayList<>();
private List<Object> dataValues;
@@ -42,12 +39,15 @@ public class BeamSQLRow implements Serializable {
this.dataValues = new ArrayList<>();
for (int idx = 0; idx < dataType.size(); ++idx) {
dataValues.add(null);
+ nullFields.add(idx);
}
}
public BeamSQLRow(BeamSQLRecordType dataType, List<Object> dataValues) {
- this.dataValues = dataValues;
- this.dataType = dataType;
+ this(dataType);
+ for (int idx = 0; idx < dataValues.size(); ++idx) {
+ addField(idx, dataValues.get(idx));
+ }
}
public void addField(String fieldName, Object fieldValue) {
@@ -56,19 +56,29 @@ public class BeamSQLRow implements Serializable {
public void addField(int index, Object fieldValue) {
if (fieldValue == null) {
- dataValues.set(index, fieldValue);
- if (!nullFields.contains(index)) {
- nullFields.add(index);
- }
return;
+ } else {
+ if (nullFields.contains(index)) {
+ nullFields.remove(nullFields.indexOf(index));
+ }
}
SqlTypeName fieldType = dataType.getFieldsType().get(index);
switch (fieldType) {
case INTEGER:
+ if (!(fieldValue instanceof Integer)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
case SMALLINT:
+ if (!(fieldValue instanceof Short)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
case TINYINT:
- if (!(fieldValue instanceof Integer)) {
+ if (!(fieldValue instanceof Byte)) {
throw new InvalidFieldException(
String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
}
@@ -97,24 +107,24 @@ public class BeamSQLRow implements Serializable {
String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
}
break;
- case TIME:
- case TIMESTAMP:
- if (!(fieldValue instanceof Date)) {
- throw new InvalidFieldException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }
- break;
default:
throw new UnsupportedDataTypeException(fieldType);
}
dataValues.set(index, fieldValue);
}
+ public short getShort(int idx) {
+ return (Short) getFieldValue(idx);
+ }
public int getInteger(int idx) {
return (Integer) getFieldValue(idx);
}
+ public float getFloat(int idx) {
+ return (Float) getFieldValue(idx);
+ }
+
public double getDouble(int idx) {
return (Double) getFieldValue(idx);
}
@@ -145,48 +155,52 @@ public class BeamSQLRow implements Serializable {
switch (fieldType) {
case INTEGER:
+ if (!(fieldValue instanceof Integer)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ } else {
+ return fieldValue;
+ }
case SMALLINT:
+ if (!(fieldValue instanceof Short)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ } else {
+ return fieldValue;
+ }
case TINYINT:
- if (!(fieldValue instanceof Integer)) {
+ if (!(fieldValue instanceof Byte)) {
throw new InvalidFieldException(
String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
} else {
- return Integer.valueOf(fieldValue.toString());
+ return fieldValue;
}
case DOUBLE:
if (!(fieldValue instanceof Double)) {
throw new InvalidFieldException(
String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
} else {
- return Double.valueOf(fieldValue.toString());
+ return fieldValue;
}
case BIGINT:
if (!(fieldValue instanceof Long)) {
throw new InvalidFieldException(
String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
} else {
- return Long.valueOf(fieldValue.toString());
+ return fieldValue;
}
case FLOAT:
if (!(fieldValue instanceof Float)) {
throw new InvalidFieldException(
String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
} else {
- return Float.valueOf(fieldValue.toString());
+ return fieldValue;
}
case VARCHAR:
if (!(fieldValue instanceof String)) {
throw new InvalidFieldException(
String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
} else {
- return fieldValue.toString();
- }
- case TIME:
- case TIMESTAMP:
- if (!(fieldValue instanceof Date)) {
- throw new InvalidFieldException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- } else {
return fieldValue;
}
default: