You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/06/12 14:48:41 UTC
[4/5] beam git commit: rename SQL to Sql in class name
rename SQL to Sql in class name
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/743f0b3b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/743f0b3b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/743f0b3b
Branch: refs/heads/DSL_SQL
Commit: 743f0b3b62616d1c8fe77d306920fbcce81c8855
Parents: 5c1f2cb
Author: mingmxu <mi...@ebay.com>
Authored: Fri Jun 9 23:07:00 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Jun 12 07:47:40 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/dsls/sql/BeamSql.java | 20 +-
.../org/apache/beam/dsls/sql/BeamSqlCli.java | 8 +-
.../beam/dsls/sql/example/BeamSqlExample.java | 16 +-
.../interpreter/BeamSQLExpressionExecutor.java | 43 ---
.../dsls/sql/interpreter/BeamSQLFnExecutor.java | 235 ------------
.../interpreter/BeamSqlExpressionExecutor.java | 43 +++
.../dsls/sql/interpreter/BeamSqlFnExecutor.java | 235 ++++++++++++
.../operator/BeamSqlAndExpression.java | 4 +-
.../operator/BeamSqlCaseExpression.java | 4 +-
.../operator/BeamSqlCompareExpression.java | 4 +-
.../interpreter/operator/BeamSqlExpression.java | 8 +-
.../operator/BeamSqlInputRefExpression.java | 4 +-
.../operator/BeamSqlIsNotNullExpression.java | 4 +-
.../operator/BeamSqlIsNullExpression.java | 4 +-
.../operator/BeamSqlOrExpression.java | 4 +-
.../interpreter/operator/BeamSqlPrimitive.java | 6 +-
.../operator/BeamSqlUdfExpression.java | 4 +-
.../operator/BeamSqlWindowEndExpression.java | 4 +-
.../operator/BeamSqlWindowExpression.java | 4 +-
.../operator/BeamSqlWindowStartExpression.java | 4 +-
.../arithmetic/BeamSqlArithmeticExpression.java | 6 +-
.../math/BeamSqlMathUnaryExpression.java | 4 +-
.../string/BeamSqlCharLengthExpression.java | 4 +-
.../string/BeamSqlConcatExpression.java | 4 +-
.../string/BeamSqlInitCapExpression.java | 4 +-
.../operator/string/BeamSqlLowerExpression.java | 4 +-
.../string/BeamSqlOverlayExpression.java | 4 +-
.../string/BeamSqlPositionExpression.java | 4 +-
.../string/BeamSqlSubstringExpression.java | 4 +-
.../operator/string/BeamSqlTrimExpression.java | 4 +-
.../operator/string/BeamSqlUpperExpression.java | 4 +-
.../beam/dsls/sql/planner/BeamQueryPlanner.java | 4 +-
.../beam/dsls/sql/planner/BeamSQLRelUtils.java | 74 ----
.../beam/dsls/sql/planner/BeamSqlRelUtils.java | 74 ++++
.../beam/dsls/sql/rel/BeamAggregationRel.java | 58 +--
.../apache/beam/dsls/sql/rel/BeamFilterRel.java | 28 +-
.../apache/beam/dsls/sql/rel/BeamIOSinkRel.java | 12 +-
.../beam/dsls/sql/rel/BeamIOSourceRel.java | 14 +-
.../beam/dsls/sql/rel/BeamProjectRel.java | 28 +-
.../apache/beam/dsls/sql/rel/BeamRelNode.java | 4 +-
.../apache/beam/dsls/sql/rel/BeamSortRel.java | 32 +-
.../apache/beam/dsls/sql/rel/BeamValuesRel.java | 16 +-
.../beam/dsls/sql/schema/BaseBeamTable.java | 10 +-
.../dsls/sql/schema/BeamPCollectionTable.java | 10 +-
.../beam/dsls/sql/schema/BeamSQLRecordType.java | 97 -----
.../apache/beam/dsls/sql/schema/BeamSQLRow.java | 367 -------------------
.../beam/dsls/sql/schema/BeamSqlRecordType.java | 96 +++++
.../apache/beam/dsls/sql/schema/BeamSqlRow.java | 367 +++++++++++++++++++
.../beam/dsls/sql/schema/BeamSqlRowCoder.java | 16 +-
.../beam/dsls/sql/schema/BeamTableUtils.java | 10 +-
.../sql/schema/kafka/BeamKafkaCSVTable.java | 42 +--
.../dsls/sql/schema/kafka/BeamKafkaTable.java | 16 +-
.../dsls/sql/schema/text/BeamTextCSVTable.java | 6 +-
.../schema/text/BeamTextCSVTableIOReader.java | 18 +-
.../schema/text/BeamTextCSVTableIOWriter.java | 20 +-
.../transform/BeamAggregationTransforms.java | 56 +--
.../dsls/sql/transform/BeamSQLFilterFn.java | 62 ----
.../sql/transform/BeamSQLOutputToConsoleFn.java | 41 ---
.../dsls/sql/transform/BeamSQLProjectFn.java | 72 ----
.../dsls/sql/transform/BeamSqlFilterFn.java | 62 ++++
.../sql/transform/BeamSqlOutputToConsoleFn.java | 41 +++
.../dsls/sql/transform/BeamSqlProjectFn.java | 72 ++++
.../beam/dsls/sql/transform/package-info.java | 2 +-
.../sql/interpreter/BeamSQLFnExecutorTest.java | 268 --------------
.../interpreter/BeamSQLFnExecutorTestBase.java | 91 -----
.../sql/interpreter/BeamSqlFnExecutorTest.java | 268 ++++++++++++++
.../interpreter/BeamSqlFnExecutorTestBase.java | 91 +++++
.../operator/BeamNullExperssionTest.java | 4 +-
.../operator/BeamSqlAndOrExpressionTest.java | 4 +-
.../operator/BeamSqlCaseExpressionTest.java | 4 +-
.../operator/BeamSqlCompareExpressionTest.java | 4 +-
.../operator/BeamSqlInputRefExpressionTest.java | 4 +-
.../operator/BeamSqlPrimitiveTest.java | 4 +-
.../operator/BeamSqlUdfExpressionTest.java | 4 +-
.../BeamSqlArithmeticExpressionTest.java | 4 +-
.../math/BeamSqlMathUnaryExpressionTest.java | 4 +-
.../string/BeamSqlCharLengthExpressionTest.java | 4 +-
.../string/BeamSqlConcatExpressionTest.java | 4 +-
.../string/BeamSqlInitCapExpressionTest.java | 4 +-
.../string/BeamSqlLowerExpressionTest.java | 4 +-
.../string/BeamSqlOverlayExpressionTest.java | 4 +-
.../string/BeamSqlPositionExpressionTest.java | 4 +-
.../string/BeamSqlSubstringExpressionTest.java | 4 +-
.../string/BeamSqlTrimExpressionTest.java | 4 +-
.../string/BeamSqlUpperExpressionTest.java | 4 +-
.../beam/dsls/sql/planner/BasePlanner.java | 16 +-
.../sql/planner/BeamGroupByPipelineTest.java | 16 +-
.../sql/planner/BeamInvalidGroupByTest.java | 6 +-
.../BeamPlannerAggregationSubmitTest.java | 28 +-
.../dsls/sql/planner/BeamPlannerSubmitTest.java | 10 +-
.../dsls/sql/planner/MockedBeamSQLTable.java | 161 --------
.../dsls/sql/planner/MockedBeamSqlTable.java | 161 ++++++++
.../beam/dsls/sql/rel/BeamSortRelTest.java | 44 +--
.../beam/dsls/sql/rel/BeamValuesRelTest.java | 22 +-
.../sql/schema/BeamPCollectionTableTest.java | 6 +-
.../dsls/sql/schema/BeamSqlRowCoderTest.java | 4 +-
.../sql/schema/kafka/BeamKafkaCSVTableTest.java | 16 +-
.../sql/schema/text/BeamTextCSVTableTest.java | 18 +-
.../transform/BeamAggregationTransformTest.java | 56 +--
.../schema/transform/BeamTransformBaseTest.java | 22 +-
100 files changed, 1953 insertions(+), 1954 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
index 8c2c5ad..809fed3 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
@@ -20,7 +20,7 @@ package org.apache.beam.dsls.sql;
import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
import org.apache.beam.dsls.sql.rel.BeamRelNode;
import org.apache.beam.dsls.sql.schema.BeamPCollectionTable;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.transforms.PTransform;
@@ -76,14 +76,14 @@ public class BeamSql {
* Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan.
*
* <p>The returned {@link PTransform} can be applied to a {@link PCollectionTuple} representing
- * all the input tables and results in a {@code PCollection<BeamSQLRow>} representing the output
+ * all the input tables and results in a {@code PCollection<BeamSqlRow>} representing the output
* table. The {@link PCollectionTuple} contains the mapping from {@code table names} to
- * {@code PCollection<BeamSQLRow>}, each representing an input table.
+ * {@code PCollection<BeamSqlRow>}, each representing an input table.
*
* <p>It is an error to apply a {@link PCollectionTuple} missing any {@code table names}
* referenced within the query.
*/
- public static PTransform<PCollectionTuple, PCollection<BeamSQLRow>> query(String sqlQuery) {
+ public static PTransform<PCollectionTuple, PCollection<BeamSqlRow>> query(String sqlQuery) {
return new QueryTransform(sqlQuery);
}
@@ -94,7 +94,7 @@ public class BeamSql {
* <p>This is a simplified form of {@link #query(String)} where the query must reference
* a single input table.
*/
- public static PTransform<PCollection<BeamSQLRow>, PCollection<BeamSQLRow>>
+ public static PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>>
simpleQuery(String sqlQuery) throws Exception {
return new SimpleQueryTransform(sqlQuery);
}
@@ -102,14 +102,14 @@ public class BeamSql {
/**
* A {@link PTransform} representing an execution plan for a SQL query.
*/
- public static class QueryTransform extends PTransform<PCollectionTuple, PCollection<BeamSQLRow>> {
+ public static class QueryTransform extends PTransform<PCollectionTuple, PCollection<BeamSqlRow>> {
private String sqlQuery;
public QueryTransform(String sqlQuery) {
this.sqlQuery = sqlQuery;
}
@Override
- public PCollection<BeamSQLRow> expand(PCollectionTuple input) {
+ public PCollection<BeamSqlRow> expand(PCollectionTuple input) {
BeamRelNode beamRelNode = null;
try {
beamRelNode = BeamSqlEnv.planner.convertToBeamRel(sqlQuery);
@@ -130,7 +130,7 @@ public class BeamSql {
* a single table.
*/
public static class SimpleQueryTransform
- extends PTransform<PCollection<BeamSQLRow>, PCollection<BeamSQLRow>> {
+ extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> {
private String sqlQuery;
public SimpleQueryTransform(String sqlQuery) {
this.sqlQuery = sqlQuery;
@@ -141,7 +141,7 @@ public class BeamSql {
}
@Override
- public PCollection<BeamSQLRow> expand(PCollection<BeamSQLRow> input) {
+ public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) {
SqlNode sqlNode;
try {
sqlNode = BeamSqlEnv.planner.parseQuery(sqlQuery);
@@ -156,7 +156,7 @@ public class BeamSql {
String tableName = select.getFrom().toString();
BeamSqlEnv.registerTable(tableName,
new BeamPCollectionTable(input, inputCoder.getTableSchema().toRelDataType()));
- return PCollectionTuple.of(new TupleTag<BeamSQLRow>(tableName), input)
+ return PCollectionTuple.of(new TupleTag<BeamSqlRow>(tableName), input)
.apply(BeamSql.query(sqlQuery));
} else {
throw new BeamSqlUnsupportedException(sqlNode.toString());
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
index 6591589..a55f655 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
@@ -18,7 +18,7 @@
package org.apache.beam.dsls.sql;
import org.apache.beam.dsls.sql.rel.BeamRelNode;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -48,7 +48,7 @@ public class BeamSqlCli {
/**
* compile SQL, and return a {@link Pipeline}.
*/
- public static PCollection<BeamSQLRow> compilePipeline(String sqlStatement) throws Exception{
+ public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement) throws Exception{
PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
.as(PipelineOptions.class); // FlinkPipelineOptions.class
options.setJobName("BeamPlanCreator");
@@ -60,9 +60,9 @@ public class BeamSqlCli {
/**
* compile SQL, and return a {@link Pipeline}.
*/
- public static PCollection<BeamSQLRow> compilePipeline(String sqlStatement, Pipeline basePipeline)
+ public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline)
throws Exception{
- PCollection<BeamSQLRow> resultStream =
+ PCollection<BeamSqlRow> resultStream =
BeamSqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline);
return resultStream;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
index 6a1b81d..4d7328e 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
@@ -18,8 +18,8 @@
package org.apache.beam.dsls.sql.example;
import org.apache.beam.dsls.sql.BeamSql;
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -45,28 +45,28 @@ public class BeamSqlExample {
Pipeline p = Pipeline.create(options);
//define the input row format
- BeamSQLRecordType type = new BeamSQLRecordType();
+ BeamSqlRecordType type = new BeamSqlRecordType();
type.addField("c1", SqlTypeName.INTEGER);
type.addField("c2", SqlTypeName.VARCHAR);
type.addField("c3", SqlTypeName.DOUBLE);
- BeamSQLRow row = new BeamSQLRow(type);
+ BeamSqlRow row = new BeamSqlRow(type);
row.addField(0, 1);
row.addField(1, "row");
row.addField(2, 1.0);
//create a source PCollection with Create.of();
- PCollection<BeamSQLRow> inputTable = PBegin.in(p).apply(Create.of(row)
+ PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row)
.withCoder(new BeamSqlRowCoder(type)));
//run a simple SQL query over input PCollection;
String sql = "select c2, c3 from TABLE_A where c1=1";
- PCollection<BeamSQLRow> outputStream = inputTable.apply(BeamSql.simpleQuery(sql));
+ PCollection<BeamSqlRow> outputStream = inputTable.apply(BeamSql.simpleQuery(sql));
//log out the output record;
outputStream.apply("log_result",
- MapElements.<BeamSQLRow, Void>via(new SimpleFunction<BeamSQLRow, Void>() {
+ MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() {
@Override
- public Void apply(BeamSQLRow input) {
+ public Void apply(BeamSqlRow input) {
LOG.info(input.valueInString());
return null;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java
deleted file mode 100644
index 1285280..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.interpreter;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
-
-/**
- * {@code BeamSQLExpressionExecutor} fills the gap between relational
- * expressions in Calcite SQL and executable code.
- *
- */
-public interface BeamSQLExpressionExecutor extends Serializable {
-
- /**
- * invoked before data processing.
- */
- void prepare();
-
- /**
- * apply transformation to input record {@link BeamSQLRow}.
- *
- */
- List<Object> execute(BeamSQLRow inputRecord);
-
- void close();
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
deleted file mode 100644
index 51fe2c9..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.interpreter;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlAndExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCaseExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlIsNotNullExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlIsNullExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLargerThanEqualExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLargerThanExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanEqualExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlNotEqualExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlOrExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowEndExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowStartExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAbsExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlSqrtExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlCharLengthExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlConcatExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlInitCapExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlLowerExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlOverlayExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlPositionExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlSubstringExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlTrimExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlUpperExpression;
-import org.apache.beam.dsls.sql.rel.BeamFilterRel;
-import org.apache.beam.dsls.sql.rel.BeamProjectRel;
-import org.apache.beam.dsls.sql.rel.BeamRelNode;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.impl.ScalarFunctionImpl;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
-import org.apache.calcite.util.NlsString;
-
-/**
- * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}.
- * {@code BeamSQLFnExecutor} converts a {@link BeamRelNode} to a {@link BeamSqlExpression},
- * which can be evaluated against the {@link BeamSQLRow}.
- *
- */
-public class BeamSQLFnExecutor implements BeamSQLExpressionExecutor {
- protected List<BeamSqlExpression> exps;
-
- public BeamSQLFnExecutor(BeamRelNode relNode) {
- this.exps = new ArrayList<>();
- if (relNode instanceof BeamFilterRel) {
- BeamFilterRel filterNode = (BeamFilterRel) relNode;
- RexNode condition = filterNode.getCondition();
- exps.add(buildExpression(condition));
- } else if (relNode instanceof BeamProjectRel) {
- BeamProjectRel projectNode = (BeamProjectRel) relNode;
- List<RexNode> projects = projectNode.getProjects();
- for (RexNode rexNode : projects) {
- exps.add(buildExpression(rexNode));
- }
- } else {
- throw new BeamSqlUnsupportedException(
- String.format("%s is not supported yet", relNode.getClass().toString()));
- }
- }
-
- /**
- * {@link #buildExpression(RexNode)} visits the operands of {@link RexNode} recursively,
- * and represent each {@link SqlOperator} with a corresponding {@link BeamSqlExpression}.
- */
- static BeamSqlExpression buildExpression(RexNode rexNode) {
- if (rexNode instanceof RexLiteral) {
- RexLiteral node = (RexLiteral) rexNode;
- // NlsString is not serializable, we need to convert
- // it to string explicitly.
- if (SqlTypeName.CHAR_TYPES.contains(node.getTypeName())
- && node.getValue() instanceof NlsString) {
- return BeamSqlPrimitive.of(node.getTypeName(), ((NlsString) node.getValue()).getValue());
- } else {
- return BeamSqlPrimitive.of(node.getTypeName(), node.getValue());
- }
- } else if (rexNode instanceof RexInputRef) {
- RexInputRef node = (RexInputRef) rexNode;
- return new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex());
- } else if (rexNode instanceof RexCall) {
- RexCall node = (RexCall) rexNode;
- String opName = node.op.getName();
- List<BeamSqlExpression> subExps = new ArrayList<>();
- for (RexNode subNode : node.getOperands()) {
- subExps.add(buildExpression(subNode));
- }
- switch (opName) {
- case "AND":
- return new BeamSqlAndExpression(subExps);
- case "OR":
- return new BeamSqlOrExpression(subExps);
-
- case "=":
- return new BeamSqlEqualExpression(subExps);
- case "<>=":
- return new BeamSqlNotEqualExpression(subExps);
- case ">":
- return new BeamSqlLargerThanExpression(subExps);
- case ">=":
- return new BeamSqlLargerThanEqualExpression(subExps);
- case "<":
- return new BeamSqlLessThanExpression(subExps);
- case "<=":
- return new BeamSqlLessThanEqualExpression(subExps);
-
- // arithmetic operators
- case "+":
- return new BeamSqlPlusExpression(subExps);
- case "-":
- return new BeamSqlMinusExpression(subExps);
- case "*":
- return new BeamSqlMultiplyExpression(subExps);
- case "/":
- return new BeamSqlDivideExpression(subExps);
- case "MOD":
- return new BeamSqlModExpression(subExps);
-
- case "ABS":
- return new BeamSqlAbsExpression(subExps);
- case "SQRT":
- return new BeamSqlSqrtExpression(subExps);
-
- // string operators
- case "||":
- return new BeamSqlConcatExpression(subExps);
- case "POSITION":
- return new BeamSqlPositionExpression(subExps);
- case "CHAR_LENGTH":
- case "CHARACTER_LENGTH":
- return new BeamSqlCharLengthExpression(subExps);
- case "UPPER":
- return new BeamSqlUpperExpression(subExps);
- case "LOWER":
- return new BeamSqlLowerExpression(subExps);
- case "TRIM":
- return new BeamSqlTrimExpression(subExps);
- case "SUBSTRING":
- return new BeamSqlSubstringExpression(subExps);
- case "OVERLAY":
- return new BeamSqlOverlayExpression(subExps);
- case "INITCAP":
- return new BeamSqlInitCapExpression(subExps);
-
- case "CASE":
- return new BeamSqlCaseExpression(subExps);
-
- case "IS NULL":
- return new BeamSqlIsNullExpression(subExps.get(0));
- case "IS NOT NULL":
- return new BeamSqlIsNotNullExpression(subExps.get(0));
-
- case "HOP":
- case "TUMBLE":
- case "SESSION":
- return new BeamSqlWindowExpression(subExps, node.type.getSqlTypeName());
- case "HOP_START":
- case "TUMBLE_START":
- case "SESSION_START":
- return new BeamSqlWindowStartExpression();
- case "HOP_END":
- case "TUMBLE_END":
- case "SESSION_END":
- return new BeamSqlWindowEndExpression();
- default:
- //handle UDF
- if (((RexCall) rexNode).getOperator() instanceof SqlUserDefinedFunction) {
- SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) rexNode).getOperator();
- ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction();
- return new BeamSqlUdfExpression(fn.method, subExps,
- ((RexCall) rexNode).type.getSqlTypeName());
- } else {
- throw new BeamSqlUnsupportedException("Operator: " + opName + " not supported yet!");
- }
- }
- } else {
- throw new BeamSqlUnsupportedException(
- String.format("%s is not supported yet", rexNode.getClass().toString()));
- }
- }
-
- @Override
- public void prepare() {
- }
-
- @Override
- public List<Object> execute(BeamSQLRow inputRecord) {
- List<Object> results = new ArrayList<>();
- for (BeamSqlExpression exp : exps) {
- results.add(exp.evaluate(inputRecord).getValue());
- }
- return results;
- }
-
- @Override
- public void close() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
new file mode 100644
index 0000000..a314bf4
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+
+/**
+ * {@code BeamSqlExpressionExecutor} fills the gap between relational
+ * expressions in Calcite SQL and executable code.
+ *
+ */
+public interface BeamSqlExpressionExecutor extends Serializable {
+
+ /**
+ * invoked before data processing.
+ */
+ void prepare();
+
+ /**
+ * apply transformation to input record {@link BeamSqlRow}.
+ *
+ */
+ List<Object> execute(BeamSqlRow inputRecord);
+
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
new file mode 100644
index 0000000..1d1dfc1
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlAndExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCaseExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlIsNotNullExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlIsNullExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLargerThanEqualExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLargerThanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanEqualExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlNotEqualExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlOrExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowEndExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowStartExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAbsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlSqrtExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlCharLengthExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlConcatExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlInitCapExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlLowerExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlOverlayExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlPositionExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlSubstringExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlTrimExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlUpperExpression;
+import org.apache.beam.dsls.sql.rel.BeamFilterRel;
+import org.apache.beam.dsls.sql.rel.BeamProjectRel;
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+import org.apache.calcite.util.NlsString;
+
+/**
+ * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}.
+ * {@code BeamSqlFnExecutor} converts a {@link BeamRelNode} to a {@link BeamSqlExpression},
+ * which can be evaluated against the {@link BeamSqlRow}.
+ *
+ */
+public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
+ protected List<BeamSqlExpression> exps;
+
+ public BeamSqlFnExecutor(BeamRelNode relNode) {
+ this.exps = new ArrayList<>();
+ if (relNode instanceof BeamFilterRel) {
+ BeamFilterRel filterNode = (BeamFilterRel) relNode;
+ RexNode condition = filterNode.getCondition();
+ exps.add(buildExpression(condition));
+ } else if (relNode instanceof BeamProjectRel) {
+ BeamProjectRel projectNode = (BeamProjectRel) relNode;
+ List<RexNode> projects = projectNode.getProjects();
+ for (RexNode rexNode : projects) {
+ exps.add(buildExpression(rexNode));
+ }
+ } else {
+ throw new BeamSqlUnsupportedException(
+ String.format("%s is not supported yet", relNode.getClass().toString()));
+ }
+ }
+
+ /**
+ * {@link #buildExpression(RexNode)} visits the operands of {@link RexNode} recursively,
+ * and represent each {@link SqlOperator} with a corresponding {@link BeamSqlExpression}.
+ */
+ static BeamSqlExpression buildExpression(RexNode rexNode) {
+ if (rexNode instanceof RexLiteral) {
+ RexLiteral node = (RexLiteral) rexNode;
+ // NlsString is not serializable, we need to convert
+ // it to string explicitly.
+ if (SqlTypeName.CHAR_TYPES.contains(node.getTypeName())
+ && node.getValue() instanceof NlsString) {
+ return BeamSqlPrimitive.of(node.getTypeName(), ((NlsString) node.getValue()).getValue());
+ } else {
+ return BeamSqlPrimitive.of(node.getTypeName(), node.getValue());
+ }
+ } else if (rexNode instanceof RexInputRef) {
+ RexInputRef node = (RexInputRef) rexNode;
+ return new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex());
+ } else if (rexNode instanceof RexCall) {
+ RexCall node = (RexCall) rexNode;
+ String opName = node.op.getName();
+ List<BeamSqlExpression> subExps = new ArrayList<>();
+ for (RexNode subNode : node.getOperands()) {
+ subExps.add(buildExpression(subNode));
+ }
+ switch (opName) {
+ case "AND":
+ return new BeamSqlAndExpression(subExps);
+ case "OR":
+ return new BeamSqlOrExpression(subExps);
+
+ case "=":
+ return new BeamSqlEqualExpression(subExps);
+ case "<>=":
+ return new BeamSqlNotEqualExpression(subExps);
+ case ">":
+ return new BeamSqlLargerThanExpression(subExps);
+ case ">=":
+ return new BeamSqlLargerThanEqualExpression(subExps);
+ case "<":
+ return new BeamSqlLessThanExpression(subExps);
+ case "<=":
+ return new BeamSqlLessThanEqualExpression(subExps);
+
+ // arithmetic operators
+ case "+":
+ return new BeamSqlPlusExpression(subExps);
+ case "-":
+ return new BeamSqlMinusExpression(subExps);
+ case "*":
+ return new BeamSqlMultiplyExpression(subExps);
+ case "/":
+ return new BeamSqlDivideExpression(subExps);
+ case "MOD":
+ return new BeamSqlModExpression(subExps);
+
+ case "ABS":
+ return new BeamSqlAbsExpression(subExps);
+ case "SQRT":
+ return new BeamSqlSqrtExpression(subExps);
+
+ // string operators
+ case "||":
+ return new BeamSqlConcatExpression(subExps);
+ case "POSITION":
+ return new BeamSqlPositionExpression(subExps);
+ case "CHAR_LENGTH":
+ case "CHARACTER_LENGTH":
+ return new BeamSqlCharLengthExpression(subExps);
+ case "UPPER":
+ return new BeamSqlUpperExpression(subExps);
+ case "LOWER":
+ return new BeamSqlLowerExpression(subExps);
+ case "TRIM":
+ return new BeamSqlTrimExpression(subExps);
+ case "SUBSTRING":
+ return new BeamSqlSubstringExpression(subExps);
+ case "OVERLAY":
+ return new BeamSqlOverlayExpression(subExps);
+ case "INITCAP":
+ return new BeamSqlInitCapExpression(subExps);
+
+ case "CASE":
+ return new BeamSqlCaseExpression(subExps);
+
+ case "IS NULL":
+ return new BeamSqlIsNullExpression(subExps.get(0));
+ case "IS NOT NULL":
+ return new BeamSqlIsNotNullExpression(subExps.get(0));
+
+ case "HOP":
+ case "TUMBLE":
+ case "SESSION":
+ return new BeamSqlWindowExpression(subExps, node.type.getSqlTypeName());
+ case "HOP_START":
+ case "TUMBLE_START":
+ case "SESSION_START":
+ return new BeamSqlWindowStartExpression();
+ case "HOP_END":
+ case "TUMBLE_END":
+ case "SESSION_END":
+ return new BeamSqlWindowEndExpression();
+ default:
+ //handle UDF
+ if (((RexCall) rexNode).getOperator() instanceof SqlUserDefinedFunction) {
+ SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) rexNode).getOperator();
+ ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction();
+ return new BeamSqlUdfExpression(fn.method, subExps,
+ ((RexCall) rexNode).type.getSqlTypeName());
+ } else {
+ throw new BeamSqlUnsupportedException("Operator: " + opName + " not supported yet!");
+ }
+ }
+ } else {
+ throw new BeamSqlUnsupportedException(
+ String.format("%s is not supported yet", rexNode.getClass().toString()));
+ }
+ }
+
+ @Override
+ public void prepare() {
+ }
+
+ @Override
+ public List<Object> execute(BeamSqlRow inputRecord) {
+ List<Object> results = new ArrayList<>();
+ for (BeamSqlExpression exp : exps) {
+ results.add(exp.evaluate(inputRecord).getValue());
+ }
+ return results;
+ }
+
+ @Override
+ public void close() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java
index 55473b5..d7dc7d7 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java
@@ -18,7 +18,7 @@
package org.apache.beam.dsls.sql.interpreter.operator;
import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -45,7 +45,7 @@ public class BeamSqlAndExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) {
+ public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
boolean result = true;
for (BeamSqlExpression exp : operands) {
BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord);
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
index d108abd..a15c42e 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
@@ -20,7 +20,7 @@ package org.apache.beam.dsls.sql.interpreter.operator;
import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -49,7 +49,7 @@ public class BeamSqlCaseExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
for (int i = 0; i < operands.size() - 1; i += 2) {
if (opValueEvaluated(i, inputRecord)) {
return BeamSqlPrimitive.of(
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
index bfb798d..d75e13d 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
@@ -19,7 +19,7 @@ package org.apache.beam.dsls.sql.interpreter.operator;
import java.util.List;
import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -49,7 +49,7 @@ public abstract class BeamSqlCompareExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) {
+ public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
Object leftValue = operands.get(0).evaluate(inputRecord).getValue();
Object rightValue = operands.get(1).evaluate(inputRecord).getValue();
switch (operands.get(0).outputType) {
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
index 811e21b..41dac76 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
@@ -19,7 +19,7 @@ package org.apache.beam.dsls.sql.interpreter.operator;
import java.io.Serializable;
import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -49,7 +49,7 @@ public abstract class BeamSqlExpression implements Serializable{
return op(idx).getOutputType();
}
- public <T> T opValueEvaluated(int idx, BeamSQLRow row) {
+ public <T> T opValueEvaluated(int idx, BeamSqlRow row) {
return (T) op(idx).evaluate(row).getValue();
}
@@ -59,10 +59,10 @@ public abstract class BeamSqlExpression implements Serializable{
public abstract boolean accept();
/**
- * Apply input record {@link BeamSQLRow} to this expression,
+ * Apply input record {@link BeamSqlRow} to this expression,
* the output value is wrapped with {@link BeamSqlPrimitive}.
*/
- public abstract BeamSqlPrimitive evaluate(BeamSQLRow inputRecord);
+ public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRecord);
public List<BeamSqlExpression> getOperands() {
return operands;
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
index 612108f..3e99caf 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.dsls.sql.interpreter.operator;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -38,7 +38,7 @@ public class BeamSqlInputRefExpression extends BeamSqlExpression{
}
@Override
- public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+ public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
return BeamSqlPrimitive.of(outputType, inputRecord.getFieldValue(inputRef));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
index 784584e..e08e737 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
@@ -19,7 +19,7 @@ package org.apache.beam.dsls.sql.interpreter.operator;
import java.util.Arrays;
import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -44,7 +44,7 @@ public class BeamSqlIsNotNullExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) {
+ public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
Object leftValue = operands.get(0).evaluate(inputRecord).getValue();
return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue != null);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
index b09ddbf..d4e070d 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
@@ -19,7 +19,7 @@ package org.apache.beam.dsls.sql.interpreter.operator;
import java.util.Arrays;
import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -44,7 +44,7 @@ public class BeamSqlIsNullExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) {
+ public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
Object leftValue = operands.get(0).evaluate(inputRecord).getValue();
return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue == null);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java
index 4d07af8..e47ed45 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java
@@ -18,7 +18,7 @@
package org.apache.beam.dsls.sql.interpreter.operator;
import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -45,7 +45,7 @@ public class BeamSqlOrExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) {
+ public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
boolean result = false;
for (BeamSqlExpression exp : operands) {
BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord);
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
index bc18c5e..d1fd886 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
@@ -24,13 +24,13 @@ import java.util.List;
import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.NlsString;
/**
* {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}.
- * It holds the value, and return it directly during {@link #evaluate(BeamSQLRow)}.
+ * It holds the value, and return it directly during {@link #evaluate(BeamSqlRow)}.
*
*/
public class BeamSqlPrimitive<T> extends BeamSqlExpression{
@@ -141,7 +141,7 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression{
}
@Override
- public BeamSqlPrimitive<T> evaluate(BeamSQLRow inputRecord) {
+ public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRecord) {
return this;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
index 389a87e..6f18307 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
@@ -20,7 +20,7 @@ package org.apache.beam.dsls.sql.interpreter.operator;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -51,7 +51,7 @@ public class BeamSqlUdfExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+ public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
if (method == null) {
reConstructMethod();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
index 96ad81f..8bc090f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
@@ -18,7 +18,7 @@
package org.apache.beam.dsls.sql.interpreter.operator;
import java.util.Date;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -34,7 +34,7 @@ public class BeamSqlWindowEndExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Date> evaluate(BeamSQLRow inputRecord) {
+ public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRecord) {
return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
new Date(inputRecord.getWindowEnd().getMillis()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
index 2fb9a48..eb4c03b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
@@ -19,7 +19,7 @@ package org.apache.beam.dsls.sql.interpreter.operator;
import java.util.Date;
import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -42,7 +42,7 @@ public class BeamSqlWindowExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Date> evaluate(BeamSQLRow inputRecord) {
+ public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRecord) {
return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
(Date) operands.get(0).evaluate(inputRecord).getValue());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
index d0ac260..1e2c0a2 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
@@ -18,7 +18,7 @@
package org.apache.beam.dsls.sql.interpreter.operator;
import java.util.Date;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -35,7 +35,7 @@ public class BeamSqlWindowStartExpression extends BeamSqlExpression {
}
@Override
- public BeamSqlPrimitive<Date> evaluate(BeamSQLRow inputRecord) {
+ public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRecord) {
return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
new Date(inputRecord.getWindowStart().getMillis()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
index 5e1d068..69f6f10 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -55,7 +55,7 @@ public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
/**
* https://dev.mysql.com/doc/refman/5.7/en/arithmetic-functions.html.
*/
- @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSQLRow inputRecord) {
+ @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRecord) {
BeamSqlExpression leftOp = operands.get(0);
BeamSqlExpression rightOp = operands.get(1);
@@ -78,7 +78,7 @@ public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
}
}
- private double getDouble(BeamSQLRow inputRecord, BeamSqlExpression op) {
+ private double getDouble(BeamSqlRow inputRecord, BeamSqlExpression op) {
Object raw = op.evaluate(inputRecord).getValue();
Double ret = null;
if (SqlTypeName.NUMERIC_TYPES.contains(op.getOutputType())) {
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
index e34d4e4..a65333c 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.dsls.sql.interpreter.operator.math;
import java.util.List;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -45,7 +45,7 @@ public abstract class BeamSqlMathUnaryExpression extends BeamSqlExpression {
return acceptance;
}
- @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSQLRow inputRecord) {
+ @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRecord) {
BeamSqlExpression operand = op(0);
return calculate(operand.evaluate(inputRecord));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
index 7dbd7f1..3ed9b80 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -33,7 +33,7 @@ public class BeamSqlCharLengthExpression extends BeamSqlStringUnaryExpression {
super(operands, SqlTypeName.INTEGER);
}
- @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
String str = opValueEvaluated(0, inputRecord);
return BeamSqlPrimitive.of(SqlTypeName.INTEGER, str.length());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
index a56e9b1..e8e4e50 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -52,7 +52,7 @@ public class BeamSqlConcatExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
String left = opValueEvaluated(0, inputRecord);
String right = opValueEvaluated(1, inputRecord);
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
index 3d0125f..51dfe28 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -33,7 +33,7 @@ public class BeamSqlInitCapExpression extends BeamSqlStringUnaryExpression {
super(operands, SqlTypeName.VARCHAR);
}
- @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
String str = opValueEvaluated(0, inputRecord);
StringBuilder ret = new StringBuilder(str);
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
index 1855c65..f70fb1a 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -33,7 +33,7 @@ public class BeamSqlLowerExpression extends BeamSqlStringUnaryExpression {
super(operands, SqlTypeName.VARCHAR);
}
- @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
String str = opValueEvaluated(0, inputRecord);
return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toLowerCase());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
index 73f2591..20d9962 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -55,7 +55,7 @@ public class BeamSqlOverlayExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
String str = opValueEvaluated(0, inputRecord);
String replaceStr = opValueEvaluated(1, inputRecord);
int idx = opValueEvaluated(2, inputRecord);
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
index a5e8400..1d09b51 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -57,7 +57,7 @@ public class BeamSqlPositionExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
String targetStr = opValueEvaluated(0, inputRecord);
String containingStr = opValueEvaluated(1, inputRecord);
int from = -1;
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
index 554a3fc..d9bbc98 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -55,7 +55,7 @@ public class BeamSqlSubstringExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
String str = opValueEvaluated(0, inputRecord);
int idx = opValueEvaluated(1, inputRecord);
int startIdx = idx;
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
index d6cad74..d7c8a6a 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -58,7 +58,7 @@ public class BeamSqlTrimExpression extends BeamSqlExpression {
return true;
}
- @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
if (operands.size() == 1) {
return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
opValueEvaluated(0, inputRecord).toString().trim());
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
index d58a283..8fcaca4 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -33,7 +33,7 @@ public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression {
super(operands, SqlTypeName.VARCHAR);
}
- @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+ @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
String str = opValueEvaluated(0, inputRecord);
return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
index 6f148d6..98580cb 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
@@ -25,7 +25,7 @@ import java.util.Map;
import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
import org.apache.beam.dsls.sql.rel.BeamRelNode;
import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -106,7 +106,7 @@ public class BeamQueryPlanner {
* which is linked with the given {@code pipeline}. The final output stream is returned as
* {@code PCollection} so more operations can be applied.
*/
- public PCollection<BeamSQLRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline)
+ public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline)
throws Exception {
BeamRelNode relNode = convertToBeamRel(sqlStatement);
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSQLRelUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSQLRelUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSQLRelUtils.java
deleted file mode 100644
index 5e5f215..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSQLRelUtils.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.planner;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.beam.dsls.sql.rel.BeamRelNode;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.plan.volcano.RelSubset;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utilities for {@code BeamRelNode}.
- */
-public class BeamSQLRelUtils {
- private static final Logger LOG = LoggerFactory.getLogger(BeamSQLRelUtils.class);
-
- private static final AtomicInteger sequence = new AtomicInteger(0);
- private static final AtomicInteger classSequence = new AtomicInteger(0);
-
- public static String getStageName(BeamRelNode relNode) {
- return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_"
- + sequence.getAndIncrement();
- }
-
- public static String getClassName(BeamRelNode relNode) {
- return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId()
- + "_" + classSequence.getAndIncrement();
- }
-
- public static BeamRelNode getBeamRelInput(RelNode input) {
- if (input instanceof RelSubset) {
- // go with known best input
- input = ((RelSubset) input).getBest();
- }
- return (BeamRelNode) input;
- }
-
- public static String explain(final RelNode rel) {
- return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
- }
-
- public static String explain(final RelNode rel, SqlExplainLevel detailLevel) {
- String explain = "";
- try {
- explain = RelOptUtil.toString(rel);
- } catch (StackOverflowError e) {
- LOG.error("StackOverflowError occurred while extracting plan. "
- + "Please report it to the dev@ mailing list.");
- LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e);
- LOG.error("Forcing plan to empty string and continue... "
- + "SQL Runner may not working properly after.");
- }
- return explain;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRelUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRelUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRelUtils.java
new file mode 100644
index 0000000..d9b6e17
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRelUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.planner;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utilities for {@code BeamRelNode}.
+ */
+public class BeamSqlRelUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(BeamSqlRelUtils.class);
+
+ private static final AtomicInteger sequence = new AtomicInteger(0);
+ private static final AtomicInteger classSequence = new AtomicInteger(0);
+
+ public static String getStageName(BeamRelNode relNode) {
+ return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_"
+ + sequence.getAndIncrement();
+ }
+
+ public static String getClassName(BeamRelNode relNode) {
+ return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId()
+ + "_" + classSequence.getAndIncrement();
+ }
+
+ public static BeamRelNode getBeamRelInput(RelNode input) {
+ if (input instanceof RelSubset) {
+ // go with known best input
+ input = ((RelSubset) input).getBest();
+ }
+ return (BeamRelNode) input;
+ }
+
+ public static String explain(final RelNode rel) {
+ return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
+ }
+
+ public static String explain(final RelNode rel, SqlExplainLevel detailLevel) {
+ String explain = "";
+ try {
+ explain = RelOptUtil.toString(rel);
+ } catch (StackOverflowError e) {
+ LOG.error("StackOverflowError occurred while extracting plan. "
+ + "Please report it to the dev@ mailing list.");
+ LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e);
+ LOG.error("Forcing plan to empty string and continue... "
+ + "SQL Runner may not working properly after.");
+ }
+ return explain;
+ }
+}