You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:09:14 UTC
[51/59] beam git commit: move all implementation classes/packages
into impl package
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPiExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPiExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPiExpression.java
deleted file mode 100644
index ed89c49..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPiExpression.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
-
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Base class for the PI function.
- */
-public class BeamSqlPiExpression extends BeamSqlExpression {
-
- public BeamSqlPiExpression() {
- this.outputType = SqlTypeName.DOUBLE;
- }
-
- @Override public boolean accept() {
- return true;
- }
-
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
- return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, Math.PI);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPowerExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPowerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPowerExpression.java
deleted file mode 100644
index e2bdd05..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPowerExpression.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.runtime.SqlFunctions;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlMathBinaryExpression} for 'POWER' function.
- */
-public class BeamSqlPowerExpression extends BeamSqlMathBinaryExpression {
-
- public BeamSqlPowerExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.DOUBLE);
- }
-
- @Override
- public BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp,
- BeamSqlPrimitive rightOp) {
- return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, SqlFunctions
- .power(SqlFunctions.toDouble(leftOp.getValue()),
- SqlFunctions.toDouble(rightOp.getValue())));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRadiansExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRadiansExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRadiansExpression.java
deleted file mode 100644
index d2d04c3..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRadiansExpression.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.runtime.SqlFunctions;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlMathUnaryExpression} for 'RADIANS' function.
- */
-public class BeamSqlRadiansExpression extends BeamSqlMathUnaryExpression {
-
- public BeamSqlRadiansExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.DOUBLE);
- }
-
- @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
- return BeamSqlPrimitive
- .of(SqlTypeName.DOUBLE, SqlFunctions.radians(SqlFunctions.toDouble(op.getValue())));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandExpression.java
deleted file mode 100644
index 8df6f67..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandExpression.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
-
-import java.util.List;
-import java.util.Random;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlMathUnaryExpression} for 'RAND([seed])' function.
- */
-public class BeamSqlRandExpression extends BeamSqlExpression {
- private Random rand = new Random();
- private Integer seed = null;
-
- public BeamSqlRandExpression(List<BeamSqlExpression> subExps) {
- super(subExps, SqlTypeName.DOUBLE);
- }
-
- @Override
- public boolean accept() {
- return true;
- }
-
- @Override
- public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
- if (operands.size() == 1) {
- int rowSeed = opValueEvaluated(0, inputRecord);
- if (seed == null || seed != rowSeed) {
- rand.setSeed(rowSeed);
- }
- }
- return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, rand.nextDouble());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandIntegerExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandIntegerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandIntegerExpression.java
deleted file mode 100644
index dfd76b8..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandIntegerExpression.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
-
-import java.util.List;
-import java.util.Random;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlMathUnaryExpression} for 'RAND_INTEGER([seed, ] numeric)'
- * function.
- */
-public class BeamSqlRandIntegerExpression extends BeamSqlExpression {
- private Random rand = new Random();
- private Integer seed = null;
-
- public BeamSqlRandIntegerExpression(List<BeamSqlExpression> subExps) {
- super(subExps, SqlTypeName.INTEGER);
- }
-
- @Override
- public boolean accept() {
- return true;
- }
-
- @Override
- public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
- int numericIdx = 0;
- if (operands.size() == 2) {
- int rowSeed = opValueEvaluated(0, inputRecord);
- if (seed == null || seed != rowSeed) {
- rand.setSeed(rowSeed);
- }
- numericIdx = 1;
- }
- return BeamSqlPrimitive.of(SqlTypeName.INTEGER,
- rand.nextInt((int) opValueEvaluated(numericIdx, inputRecord)));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRoundExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRoundExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRoundExpression.java
deleted file mode 100644
index 9349ce5..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRoundExpression.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
-
-import java.math.BigDecimal;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.runtime.SqlFunctions;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlMathBinaryExpression} for 'ROUND' function.
- */
-public class BeamSqlRoundExpression extends BeamSqlMathBinaryExpression {
-
- private final BeamSqlPrimitive zero = BeamSqlPrimitive.of(SqlTypeName.INTEGER, 0);
-
- public BeamSqlRoundExpression(List<BeamSqlExpression> operands) {
- super(operands, operands.get(0).getOutputType());
- checkForSecondOperand(operands);
- }
-
- private void checkForSecondOperand(List<BeamSqlExpression> operands) {
- if (numberOfOperands() == 1) {
- operands.add(1, zero);
- }
- }
-
- @Override public BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp,
- BeamSqlPrimitive rightOp) {
- BeamSqlPrimitive result = null;
- switch (leftOp.getOutputType()) {
- case SMALLINT:
- result = BeamSqlPrimitive.of(SqlTypeName.SMALLINT,
- (short) roundInt(toInt(leftOp.getValue()), toInt(rightOp.getValue())));
- break;
- case TINYINT:
- result = BeamSqlPrimitive.of(SqlTypeName.TINYINT,
- (byte) roundInt(toInt(leftOp.getValue()), toInt(rightOp.getValue())));
- break;
- case INTEGER:
- result = BeamSqlPrimitive
- .of(SqlTypeName.INTEGER, roundInt(leftOp.getInteger(), toInt(rightOp.getValue())));
- break;
- case BIGINT:
- result = BeamSqlPrimitive
- .of(SqlTypeName.BIGINT, roundLong(leftOp.getLong(), toInt(rightOp.getValue())));
- break;
- case DOUBLE:
- result = BeamSqlPrimitive
- .of(SqlTypeName.DOUBLE, roundDouble(leftOp.getDouble(), toInt(rightOp.getValue())));
- break;
- case FLOAT:
- result = BeamSqlPrimitive.of(SqlTypeName.FLOAT,
- (float) roundDouble(leftOp.getFloat(), toInt(rightOp.getValue())));
- break;
- case DECIMAL:
- result = BeamSqlPrimitive.of(SqlTypeName.DECIMAL,
- roundBigDecimal(toBigDecimal(leftOp.getValue()), toInt(rightOp.getValue())));
- break;
- default:
- break;
- }
- return result;
- }
-
- private int roundInt(int v1, int v2) {
- return SqlFunctions.sround(v1, v2);
- }
-
- private double roundDouble(double v1, int v2) {
- return SqlFunctions.sround(v1, v2);
- }
-
- private BigDecimal roundBigDecimal(BigDecimal v1, int v2) {
- return SqlFunctions.sround(v1, v2);
- }
-
- private long roundLong(long v1, int v2) {
- return SqlFunctions.sround(v1, v2);
- }
-
- private int toInt(Object value) {
- return SqlFunctions.toInt(value);
- }
-
- private BigDecimal toBigDecimal(Object value) {
- return SqlFunctions.toBigDecimal(value);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSignExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSignExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSignExpression.java
deleted file mode 100644
index b26ef91..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSignExpression.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.runtime.SqlFunctions;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlMathUnaryExpression} for 'SIGN' function.
- */
-public class BeamSqlSignExpression extends BeamSqlMathUnaryExpression {
-
- public BeamSqlSignExpression(List<BeamSqlExpression> operands) {
- super(operands, operands.get(0).getOutputType());
- }
-
- @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
- BeamSqlPrimitive result = null;
- switch (op.getOutputType()) {
- case TINYINT:
- result = BeamSqlPrimitive
- .of(SqlTypeName.TINYINT, (byte) SqlFunctions.sign(SqlFunctions.toByte(op.getValue())));
- break;
- case SMALLINT:
- result = BeamSqlPrimitive
- .of(SqlTypeName.SMALLINT, (short) SqlFunctions.sign(SqlFunctions.toShort(op.getValue())));
- break;
- case INTEGER:
- result = BeamSqlPrimitive
- .of(SqlTypeName.INTEGER, SqlFunctions.sign(SqlFunctions.toInt(op.getValue())));
- break;
- case BIGINT:
- result = BeamSqlPrimitive
- .of(SqlTypeName.BIGINT, SqlFunctions.sign(SqlFunctions.toLong(op.getValue())));
- break;
- case FLOAT:
- result = BeamSqlPrimitive
- .of(SqlTypeName.FLOAT, (float) SqlFunctions.sign(SqlFunctions.toFloat(op.getValue())));
- break;
- case DOUBLE:
- result = BeamSqlPrimitive
- .of(SqlTypeName.DOUBLE, SqlFunctions.sign(SqlFunctions.toDouble(op.getValue())));
- break;
- case DECIMAL:
- result = BeamSqlPrimitive
- .of(SqlTypeName.DECIMAL, SqlFunctions.sign(SqlFunctions.toBigDecimal(op.getValue())));
- break;
- default:
- break;
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSinExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSinExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSinExpression.java
deleted file mode 100644
index 1b8023e..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSinExpression.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.runtime.SqlFunctions;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlMathUnaryExpression} for 'SIN' function.
- */
-public class BeamSqlSinExpression extends BeamSqlMathUnaryExpression {
-
- public BeamSqlSinExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.DOUBLE);
- }
-
- @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
- return BeamSqlPrimitive
- .of(SqlTypeName.DOUBLE, SqlFunctions.sin(SqlFunctions.toDouble(op.getValue())));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTanExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTanExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTanExpression.java
deleted file mode 100644
index c86f8b9..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTanExpression.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.runtime.SqlFunctions;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlMathUnaryExpression} for 'TAN' function.
- */
-public class BeamSqlTanExpression extends BeamSqlMathUnaryExpression {
-
- public BeamSqlTanExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.DOUBLE);
- }
-
- @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
- return BeamSqlPrimitive
- .of(SqlTypeName.DOUBLE, SqlFunctions.tan(SqlFunctions.toDouble(op.getValue())));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTruncateExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTruncateExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTruncateExpression.java
deleted file mode 100644
index 8201360..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTruncateExpression.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.runtime.SqlFunctions;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlMathBinaryExpression} for 'TRUNCATE' function.
- */
-public class BeamSqlTruncateExpression extends BeamSqlMathBinaryExpression {
-
- public BeamSqlTruncateExpression(List<BeamSqlExpression> operands) {
- super(operands, operands.get(0).getOutputType());
- }
-
- @Override public BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp,
- BeamSqlPrimitive rightOp) {
- BeamSqlPrimitive result = null;
- int rightIntOperand = SqlFunctions.toInt(rightOp.getValue());
- switch (leftOp.getOutputType()) {
- case SMALLINT:
- result = BeamSqlPrimitive.of(SqlTypeName.SMALLINT,
- (short) SqlFunctions.struncate(SqlFunctions.toInt(leftOp.getValue()), rightIntOperand));
- break;
- case TINYINT:
- result = BeamSqlPrimitive.of(SqlTypeName.TINYINT,
- (byte) SqlFunctions.struncate(SqlFunctions.toInt(leftOp.getValue()), rightIntOperand));
- break;
- case INTEGER:
- result = BeamSqlPrimitive.of(SqlTypeName.INTEGER,
- SqlFunctions.struncate(SqlFunctions.toInt(leftOp.getValue()), rightIntOperand));
- break;
- case BIGINT:
- result = BeamSqlPrimitive
- .of(SqlTypeName.BIGINT, SqlFunctions.struncate(leftOp.getLong(), rightIntOperand));
- break;
- case FLOAT:
- result = BeamSqlPrimitive.of(SqlTypeName.FLOAT,
- (float) SqlFunctions.struncate(SqlFunctions.toFloat(leftOp.getValue()),
- rightIntOperand));
- break;
- case DOUBLE:
- result = BeamSqlPrimitive.of(SqlTypeName.DOUBLE,
- SqlFunctions.struncate(SqlFunctions.toDouble(leftOp.getValue()), rightIntOperand));
- break;
- case DECIMAL:
- result = BeamSqlPrimitive
- .of(SqlTypeName.DECIMAL, SqlFunctions.struncate(leftOp.getDecimal(), rightIntOperand));
- break;
- default:
- break;
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/package-info.java
deleted file mode 100644
index 09c0780..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * MATH functions/operators.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/package-info.java
deleted file mode 100644
index f913d7f..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Implementation for operators in {@link org.apache.calcite.sql.fun.SqlStdOperatorTable}.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter.operator;
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
deleted file mode 100644
index 44ab804..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * 'CHAR_LENGTH' operator.
- */
-public class BeamSqlCharLengthExpression extends BeamSqlStringUnaryExpression {
- public BeamSqlCharLengthExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.INTEGER);
- }
-
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
- String str = opValueEvaluated(0, inputRow);
- return BeamSqlPrimitive.of(SqlTypeName.INTEGER, str.length());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlConcatExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlConcatExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlConcatExpression.java
deleted file mode 100644
index bd298fc..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlConcatExpression.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * String concat operator.
- */
-public class BeamSqlConcatExpression extends BeamSqlExpression {
-
- protected BeamSqlConcatExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
- super(operands, outputType);
- }
-
- public BeamSqlConcatExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.VARCHAR);
- }
-
- @Override public boolean accept() {
- if (operands.size() != 2) {
- return false;
- }
-
- for (BeamSqlExpression exp : getOperands()) {
- if (!SqlTypeName.CHAR_TYPES.contains(exp.getOutputType())) {
- return false;
- }
- }
-
- return true;
- }
-
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
- String left = opValueEvaluated(0, inputRow);
- String right = opValueEvaluated(1, inputRow);
-
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
- new StringBuilder(left.length() + right.length())
- .append(left).append(right).toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlInitCapExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
deleted file mode 100644
index 79cd26f..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * 'INITCAP' operator.
- */
-public class BeamSqlInitCapExpression extends BeamSqlStringUnaryExpression {
- public BeamSqlInitCapExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.VARCHAR);
- }
-
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
- String str = opValueEvaluated(0, inputRow);
-
- StringBuilder ret = new StringBuilder(str);
- boolean isInit = true;
- for (int i = 0; i < str.length(); i++) {
- if (Character.isWhitespace(str.charAt(i))) {
- isInit = true;
- continue;
- }
-
- if (isInit) {
- ret.setCharAt(i, Character.toUpperCase(str.charAt(i)));
- isInit = false;
- } else {
- ret.setCharAt(i, Character.toLowerCase(str.charAt(i)));
- }
- }
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, ret.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlLowerExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlLowerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlLowerExpression.java
deleted file mode 100644
index 384c012..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlLowerExpression.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * 'LOWER' operator.
- */
-public class BeamSqlLowerExpression extends BeamSqlStringUnaryExpression {
- public BeamSqlLowerExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.VARCHAR);
- }
-
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
- String str = opValueEvaluated(0, inputRow);
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toLowerCase());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
deleted file mode 100644
index 44e4624..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * 'OVERLAY' operator.
- *
- * <p>
- * OVERLAY(string1 PLACING string2 FROM integer [ FOR integer2 ])
- * </p>
- */
-public class BeamSqlOverlayExpression extends BeamSqlExpression {
- public BeamSqlOverlayExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.VARCHAR);
- }
-
- @Override public boolean accept() {
- if (operands.size() < 3 || operands.size() > 4) {
- return false;
- }
-
- if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
- || !SqlTypeName.CHAR_TYPES.contains(opType(1))
- || !SqlTypeName.INT_TYPES.contains(opType(2))) {
- return false;
- }
-
- if (operands.size() == 4 && !SqlTypeName.INT_TYPES.contains(opType(3))) {
- return false;
- }
-
- return true;
- }
-
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
- String str = opValueEvaluated(0, inputRow);
- String replaceStr = opValueEvaluated(1, inputRow);
- int idx = opValueEvaluated(2, inputRow);
- // the index is 1 based.
- idx -= 1;
- int length = replaceStr.length();
- if (operands.size() == 4) {
- length = opValueEvaluated(3, inputRow);
- }
-
- StringBuilder result = new StringBuilder(
- str.length() + replaceStr.length() - length);
- result.append(str.substring(0, idx))
- .append(replaceStr)
- .append(str.substring(idx + length));
-
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, result.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpression.java
deleted file mode 100644
index 683902c..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpression.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * String position operator.
- *
- * <p>
- * example:
- * POSITION(string1 IN string2)
- * POSITION(string1 IN string2 FROM integer)
- * </p>
- */
-public class BeamSqlPositionExpression extends BeamSqlExpression {
- public BeamSqlPositionExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.INTEGER);
- }
-
- @Override public boolean accept() {
- if (operands.size() < 2 || operands.size() > 3) {
- return false;
- }
-
- if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
- || !SqlTypeName.CHAR_TYPES.contains(opType(1))) {
- return false;
- }
-
- if (operands.size() == 3
- && !SqlTypeName.INT_TYPES.contains(opType(2))) {
- return false;
- }
-
- return true;
- }
-
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
- String targetStr = opValueEvaluated(0, inputRow);
- String containingStr = opValueEvaluated(1, inputRow);
- int from = -1;
- if (operands.size() == 3) {
- Number tmp = opValueEvaluated(2, inputRow);
- from = tmp.intValue();
- }
-
- int idx = containingStr.indexOf(targetStr, from);
-
- return BeamSqlPrimitive.of(SqlTypeName.INTEGER, idx);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java
deleted file mode 100644
index d6099ab..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Base class for all string unary operators.
- */
-public abstract class BeamSqlStringUnaryExpression extends BeamSqlExpression {
- public BeamSqlStringUnaryExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
- super(operands, outputType);
- }
-
- @Override public boolean accept() {
- if (operands.size() != 1) {
- return false;
- }
-
- if (!SqlTypeName.CHAR_TYPES.contains(opType(0))) {
- return false;
- }
-
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
deleted file mode 100644
index 759bfa3..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * 'SUBSTRING' operator.
- *
- * <p>
- * SUBSTRING(string FROM integer)
- * SUBSTRING(string FROM integer FOR integer)
- * </p>
- */
-public class BeamSqlSubstringExpression extends BeamSqlExpression {
- public BeamSqlSubstringExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.VARCHAR);
- }
-
- @Override public boolean accept() {
- if (operands.size() < 2 || operands.size() > 3) {
- return false;
- }
-
- if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
- || !SqlTypeName.INT_TYPES.contains(opType(1))) {
- return false;
- }
-
- if (operands.size() == 3 && !SqlTypeName.INT_TYPES.contains(opType(2))) {
- return false;
- }
-
- return true;
- }
-
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
- String str = opValueEvaluated(0, inputRow);
- int idx = opValueEvaluated(1, inputRow);
- int startIdx = idx;
- if (startIdx > 0) {
- // NOTE: SQL substring is 1 based(rather than 0 based)
- startIdx -= 1;
- } else if (startIdx < 0) {
- // NOTE: SQL also support negative index...
- startIdx += str.length();
- } else {
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "");
- }
-
- if (operands.size() == 3) {
- int length = opValueEvaluated(2, inputRow);
- if (length < 0) {
- length = 0;
- }
- int endIdx = Math.min(startIdx + length, str.length());
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx, endIdx));
- } else {
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpression.java
deleted file mode 100644
index 19d411b..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpression.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.fun.SqlTrimFunction;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Trim operator.
- *
- * <p>
- * TRIM( { BOTH | LEADING | TRAILING } string1 FROM string2)
- * </p>
- */
-public class BeamSqlTrimExpression extends BeamSqlExpression {
- public BeamSqlTrimExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.VARCHAR);
- }
-
- @Override public boolean accept() {
- if (operands.size() != 1 && operands.size() != 3) {
- return false;
- }
-
- if (operands.size() == 1 && !SqlTypeName.CHAR_TYPES.contains(opType(0))) {
- return false;
- }
-
- if (operands.size() == 3
- && (
- SqlTypeName.SYMBOL != opType(0)
- || !SqlTypeName.CHAR_TYPES.contains(opType(1))
- || !SqlTypeName.CHAR_TYPES.contains(opType(2)))
- ) {
- return false;
- }
-
- return true;
- }
-
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
- if (operands.size() == 1) {
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
- opValueEvaluated(0, inputRow).toString().trim());
- } else {
- SqlTrimFunction.Flag type = opValueEvaluated(0, inputRow);
- String targetStr = opValueEvaluated(1, inputRow);
- String containingStr = opValueEvaluated(2, inputRow);
-
- switch (type) {
- case LEADING:
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, leadingTrim(containingStr, targetStr));
- case TRAILING:
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, trailingTrim(containingStr, targetStr));
- case BOTH:
- default:
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
- trailingTrim(leadingTrim(containingStr, targetStr), targetStr));
- }
- }
- }
-
- static String leadingTrim(String containingStr, String targetStr) {
- int idx = 0;
- while (containingStr.startsWith(targetStr, idx)) {
- idx += targetStr.length();
- }
-
- return containingStr.substring(idx);
- }
-
- static String trailingTrim(String containingStr, String targetStr) {
- int idx = containingStr.length() - targetStr.length();
- while (containingStr.startsWith(targetStr, idx)) {
- idx -= targetStr.length();
- }
-
- idx += targetStr.length();
- return containingStr.substring(0, idx);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpression.java
deleted file mode 100644
index cf27597..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpression.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * 'UPPER' operator.
- */
-public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression {
- public BeamSqlUpperExpression(List<BeamSqlExpression> operands) {
- super(operands, SqlTypeName.VARCHAR);
- }
-
- @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
- String str = opValueEvaluated(0, inputRow);
- return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/package-info.java
deleted file mode 100644
index 8b55034..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * String operators.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/package-info.java
deleted file mode 100644
index af3634a..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * interpreter generate runnable 'code' to execute SQL relational expressions.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter;
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamQueryPlanner.java
deleted file mode 100644
index ba6235f..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamQueryPlanner.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.planner;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
-import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode;
-import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.config.Lex;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.RelCollationTraitDef;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperatorTable;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
-import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.Planner;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The core component to handle through a SQL statement, from explain execution plan,
- * to generate a Beam pipeline.
- *
- */
-public class BeamQueryPlanner {
- private static final Logger LOG = LoggerFactory.getLogger(BeamQueryPlanner.class);
-
- protected final Planner planner;
- private Map<String, BaseBeamTable> sourceTables = new HashMap<>();
-
- public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
- RelDataTypeSystem.DEFAULT);
-
- public BeamQueryPlanner(SchemaPlus schema) {
- final List<RelTraitDef> traitDefs = new ArrayList<>();
- traitDefs.add(ConventionTraitDef.INSTANCE);
- traitDefs.add(RelCollationTraitDef.INSTANCE);
-
- List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
- sqlOperatorTables.add(SqlStdOperatorTable.instance());
- sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema), false,
- Collections.<String>emptyList(), TYPE_FACTORY));
-
- FrameworkConfig config = Frameworks.newConfigBuilder()
- .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema)
- .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets())
- .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM)
- .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
- .build();
- this.planner = Frameworks.getPlanner(config);
-
- for (String t : schema.getTableNames()) {
- sourceTables.put(t, (BaseBeamTable) schema.getTable(t));
- }
- }
-
- /**
- * Parse input SQL query, and return a {@link SqlNode} as grammar tree.
- */
- public SqlNode parseQuery(String sqlQuery) throws SqlParseException{
- return planner.parse(sqlQuery);
- }
-
- /**
- * {@code compileBeamPipeline} translate a SQL statement to executed as Beam data flow,
- * which is linked with the given {@code pipeline}. The final output stream is returned as
- * {@code PCollection} so more operations can be applied.
- */
- public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline
- , BeamSqlEnv sqlEnv) throws Exception {
- BeamRelNode relNode = convertToBeamRel(sqlStatement);
-
- // the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel.
- return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline), sqlEnv);
- }
-
- /**
- * It parses and validate the input query, then convert into a
- * {@link BeamRelNode} tree.
- *
- */
- public BeamRelNode convertToBeamRel(String sqlStatement)
- throws ValidationException, RelConversionException, SqlParseException {
- BeamRelNode beamRelNode;
- try {
- beamRelNode = (BeamRelNode) validateAndConvert(planner.parse(sqlStatement));
- } finally {
- planner.close();
- }
- return beamRelNode;
- }
-
- private RelNode validateAndConvert(SqlNode sqlNode)
- throws ValidationException, RelConversionException {
- SqlNode validated = validateNode(sqlNode);
- LOG.info("SQL:\n" + validated);
- RelNode relNode = convertToRelNode(validated);
- return convertToBeamRel(relNode);
- }
-
- private RelNode convertToBeamRel(RelNode relNode) throws RelConversionException {
- RelTraitSet traitSet = relNode.getTraitSet();
-
- LOG.info("SQLPlan>\n" + RelOptUtil.toString(relNode));
-
- // PlannerImpl.transform() optimizes RelNode with ruleset
- return planner.transform(0, traitSet.plus(BeamLogicalConvention.INSTANCE), relNode);
- }
-
- private RelNode convertToRelNode(SqlNode sqlNode) throws RelConversionException {
- return planner.rel(sqlNode).rel;
- }
-
- private SqlNode validateNode(SqlNode sqlNode) throws ValidationException {
- return planner.validate(sqlNode);
- }
-
- public Map<String, BaseBeamTable> getSourceTables() {
- return sourceTables;
- }
-
- public Planner getPlanner() {
- return planner;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRelDataTypeSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRelDataTypeSystem.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRelDataTypeSystem.java
deleted file mode 100644
index fba4638..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRelDataTypeSystem.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.planner;
-
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
-
-/**
- * customized data type in Beam.
- *
- */
-public class BeamRelDataTypeSystem extends RelDataTypeSystemImpl {
- public static final RelDataTypeSystem BEAM_REL_DATATYPE_SYSTEM = new BeamRelDataTypeSystem();
-
- @Override
- public int getMaxNumericScale() {
- return 38;
- }
-
- @Override
- public int getMaxNumericPrecision() {
- return 38;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRuleSets.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRuleSets.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRuleSets.java
deleted file mode 100644
index e907321..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRuleSets.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.planner;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import java.util.Iterator;
-import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode;
-import org.apache.beam.sdk.extensions.sql.rule.BeamAggregationRule;
-import org.apache.beam.sdk.extensions.sql.rule.BeamFilterRule;
-import org.apache.beam.sdk.extensions.sql.rule.BeamIOSinkRule;
-import org.apache.beam.sdk.extensions.sql.rule.BeamIOSourceRule;
-import org.apache.beam.sdk.extensions.sql.rule.BeamIntersectRule;
-import org.apache.beam.sdk.extensions.sql.rule.BeamJoinRule;
-import org.apache.beam.sdk.extensions.sql.rule.BeamMinusRule;
-import org.apache.beam.sdk.extensions.sql.rule.BeamProjectRule;
-import org.apache.beam.sdk.extensions.sql.rule.BeamSortRule;
-import org.apache.beam.sdk.extensions.sql.rule.BeamUnionRule;
-import org.apache.beam.sdk.extensions.sql.rule.BeamValuesRule;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.tools.RuleSet;
-
-/**
- * {@link RuleSet} used in {@link BeamQueryPlanner}. It translates a standard
- * Calcite {@link RelNode} tree, to represent with {@link BeamRelNode}
- *
- */
-public class BeamRuleSets {
- private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = ImmutableSet
- .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE,
- BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE,
- BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE,
- BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, BeamUnionRule.INSTANCE,
- BeamJoinRule.INSTANCE)
- .build();
-
- public static RuleSet[] getRuleSets() {
- return new RuleSet[] { new BeamRuleSet(
- ImmutableSet.<RelOptRule>builder().addAll(calciteToBeamConversionRules).build()) };
- }
-
- private static class BeamRuleSet implements RuleSet {
- final ImmutableSet<RelOptRule> rules;
-
- public BeamRuleSet(ImmutableSet<RelOptRule> rules) {
- this.rules = rules;
- }
-
- public BeamRuleSet(ImmutableList<RelOptRule> rules) {
- this.rules = ImmutableSet.<RelOptRule>builder().addAll(rules).build();
- }
-
- @Override
- public Iterator<RelOptRule> iterator() {
- return rules.iterator();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/package-info.java
deleted file mode 100644
index 680ccbd..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * {@link org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner} is the main interface.
- * It defines data sources, validate a SQL statement, and convert it as a Beam
- * pipeline.
- */
-package org.apache.beam.sdk.extensions.sql.planner;
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamAggregationRel.java
deleted file mode 100644
index 66ab892..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamAggregationRel.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.extensions.sql.transform.BeamAggregationTransforms;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.WithTimestamps;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.linq4j.Ord;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.Util;
-import org.joda.time.Duration;
-
-/**
- * {@link BeamRelNode} to replace a {@link Aggregate} node.
- *
- */
-public class BeamAggregationRel extends Aggregate implements BeamRelNode {
- private int windowFieldIdx = -1;
- private WindowFn<BeamSqlRow, BoundedWindow> windowFn;
- private Trigger trigger;
- private Duration allowedLatence = Duration.ZERO;
-
- public BeamAggregationRel(RelOptCluster cluster, RelTraitSet traits
- , RelNode child, boolean indicator,
- ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls
- , WindowFn windowFn, Trigger trigger, int windowFieldIdx, Duration allowedLatence) {
- super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
- this.windowFn = windowFn;
- this.trigger = trigger;
- this.windowFieldIdx = windowFieldIdx;
- this.allowedLatence = allowedLatence;
- }
-
- @Override
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- RelNode input = getInput();
- String stageName = BeamSqlRelUtils.getStageName(this) + "_";
-
- PCollection<BeamSqlRow> upstream =
- BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
- if (windowFieldIdx != -1) {
- upstream = upstream.apply(stageName + "assignEventTimestamp", WithTimestamps
- .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)))
- .setCoder(upstream.getCoder());
- }
-
- PCollection<BeamSqlRow> windowStream = upstream.apply(stageName + "window",
- Window.into(windowFn)
- .triggering(trigger)
- .withAllowedLateness(allowedLatence)
- .accumulatingFiredPanes());
-
- BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType()));
- PCollection<KV<BeamSqlRow, BeamSqlRow>> exCombineByStream = windowStream.apply(
- stageName + "exCombineBy",
- WithKeys
- .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(
- windowFieldIdx, groupSet)))
- .setCoder(KvCoder.of(keyCoder, upstream.getCoder()));
-
-
- BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema());
-
- PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = exCombineByStream.apply(
- stageName + "combineBy",
- Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey(
- new BeamAggregationTransforms.AggregationAdaptor(getAggCallList(),
- CalciteUtils.toBeamRowType(input.getRowType()))))
- .setCoder(KvCoder.of(keyCoder, aggCoder));
-
- PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "mergeRecord",
- ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
- CalciteUtils.toBeamRowType(getRowType()), getAggCallList(), windowFieldIdx)));
- mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
-
- return mergedStream;
- }
-
- /**
- * Type of sub-rowrecord used as Group-By keys.
- */
- private BeamSqlRowType exKeyFieldsSchema(RelDataType relDataType) {
- BeamSqlRowType inputRowType = CalciteUtils.toBeamRowType(relDataType);
- List<String> fieldNames = new ArrayList<>();
- List<Integer> fieldTypes = new ArrayList<>();
- for (int i : groupSet.asList()) {
- if (i != windowFieldIdx) {
- fieldNames.add(inputRowType.getFieldsName().get(i));
- fieldTypes.add(inputRowType.getFieldsType().get(i));
- }
- }
- return BeamSqlRowType.create(fieldNames, fieldTypes);
- }
-
- /**
- * Type of sub-rowrecord, that represents the list of aggregation fields.
- */
- private BeamSqlRowType exAggFieldsSchema() {
- List<String> fieldNames = new ArrayList<>();
- List<Integer> fieldTypes = new ArrayList<>();
- for (AggregateCall ac : getAggCallList()) {
- fieldNames.add(ac.name);
- fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
- }
-
- return BeamSqlRowType.create(fieldNames, fieldTypes);
- }
-
- @Override
- public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator
- , ImmutableBitSet groupSet,
- List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
- return new BeamAggregationRel(getCluster(), traitSet, input, indicator
- , groupSet, groupSets, aggCalls, windowFn, trigger, windowFieldIdx, allowedLatence);
- }
-
- public void setWindowFn(WindowFn windowFn) {
- this.windowFn = windowFn;
- }
-
- public void setTrigger(Trigger trigger) {
- this.trigger = trigger;
- }
-
- public RelWriter explainTerms(RelWriter pw) {
- // We skip the "groups" element if it is a singleton of "group".
- pw.item("group", groupSet)
- .itemIf("window", windowFn, windowFn != null)
- .itemIf("trigger", trigger, trigger != null)
- .itemIf("event_time", windowFieldIdx, windowFieldIdx != -1)
- .itemIf("groups", groupSets, getGroupType() != Group.SIMPLE)
- .itemIf("indicator", indicator, indicator)
- .itemIf("aggs", aggCalls, pw.nest());
- if (!pw.nest()) {
- for (Ord<AggregateCall> ord : Ord.zip(aggCalls)) {
- pw.item(Util.first(ord.e.name, "agg#" + ord.i), ord.e);
- }
- }
- return pw;
- }
-
-}