You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:09:14 UTC

[51/59] beam git commit: move all implementation classes/packages into impl package

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPiExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPiExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPiExpression.java
deleted file mode 100644
index ed89c49..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPiExpression.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
-
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Base class for the PI function.
- */
-public class BeamSqlPiExpression extends BeamSqlExpression {
-
-  public BeamSqlPiExpression() {
-    this.outputType = SqlTypeName.DOUBLE;
-  }
-
-  @Override public boolean accept() {
-    return true;
-  }
-
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
-    return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, Math.PI);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPowerExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPowerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPowerExpression.java
deleted file mode 100644
index e2bdd05..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlPowerExpression.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.runtime.SqlFunctions;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlMathBinaryExpression} for 'POWER' function.
- */
-public class BeamSqlPowerExpression extends BeamSqlMathBinaryExpression {
-
-  public BeamSqlPowerExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.DOUBLE);
-  }
-
-  @Override
-  public BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp,
-      BeamSqlPrimitive rightOp) {
-    return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, SqlFunctions
-        .power(SqlFunctions.toDouble(leftOp.getValue()),
-            SqlFunctions.toDouble(rightOp.getValue())));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRadiansExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRadiansExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRadiansExpression.java
deleted file mode 100644
index d2d04c3..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRadiansExpression.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.runtime.SqlFunctions;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlMathUnaryExpression} for 'RADIANS' function.
- */
-public class BeamSqlRadiansExpression extends BeamSqlMathUnaryExpression {
-
-  public BeamSqlRadiansExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.DOUBLE);
-  }
-
-  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
-    return BeamSqlPrimitive
-        .of(SqlTypeName.DOUBLE, SqlFunctions.radians(SqlFunctions.toDouble(op.getValue())));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandExpression.java
deleted file mode 100644
index 8df6f67..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandExpression.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
-
-import java.util.List;
-import java.util.Random;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlMathUnaryExpression} for 'RAND([seed])' function.
- */
-public class BeamSqlRandExpression extends BeamSqlExpression {
-  private Random rand = new Random();
-  private Integer seed = null;
-
-  public BeamSqlRandExpression(List<BeamSqlExpression> subExps) {
-    super(subExps, SqlTypeName.DOUBLE);
-  }
-
-  @Override
-  public boolean accept() {
-    return true;
-  }
-
-  @Override
-  public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    if (operands.size() == 1) {
-      int rowSeed = opValueEvaluated(0, inputRecord);
-      if (seed == null || seed != rowSeed) {
-        rand.setSeed(rowSeed);
-      }
-    }
-    return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, rand.nextDouble());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandIntegerExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandIntegerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandIntegerExpression.java
deleted file mode 100644
index dfd76b8..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRandIntegerExpression.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
-
-import java.util.List;
-import java.util.Random;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlMathUnaryExpression} for 'RAND_INTEGER([seed, ] numeric)'
- * function.
- */
-public class BeamSqlRandIntegerExpression extends BeamSqlExpression {
-  private Random rand = new Random();
-  private Integer seed = null;
-
-  public BeamSqlRandIntegerExpression(List<BeamSqlExpression> subExps) {
-    super(subExps, SqlTypeName.INTEGER);
-  }
-
-  @Override
-  public boolean accept() {
-    return true;
-  }
-
-  @Override
-  public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
-    int numericIdx = 0;
-    if (operands.size() == 2) {
-      int rowSeed = opValueEvaluated(0, inputRecord);
-      if (seed == null || seed != rowSeed) {
-        rand.setSeed(rowSeed);
-      }
-      numericIdx = 1;
-    }
-    return BeamSqlPrimitive.of(SqlTypeName.INTEGER,
-        rand.nextInt((int) opValueEvaluated(numericIdx, inputRecord)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRoundExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRoundExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRoundExpression.java
deleted file mode 100644
index 9349ce5..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlRoundExpression.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
-
-import java.math.BigDecimal;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.runtime.SqlFunctions;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlMathBinaryExpression} for 'ROUND' function.
- */
-public class BeamSqlRoundExpression extends BeamSqlMathBinaryExpression {
-
-  private final BeamSqlPrimitive zero = BeamSqlPrimitive.of(SqlTypeName.INTEGER, 0);
-
-  public BeamSqlRoundExpression(List<BeamSqlExpression> operands) {
-    super(operands, operands.get(0).getOutputType());
-    checkForSecondOperand(operands);
-  }
-
-  private void checkForSecondOperand(List<BeamSqlExpression> operands) {
-    if (numberOfOperands() == 1) {
-      operands.add(1, zero);
-    }
-  }
-
-  @Override public BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp,
-      BeamSqlPrimitive rightOp) {
-    BeamSqlPrimitive result = null;
-    switch (leftOp.getOutputType()) {
-      case SMALLINT:
-        result = BeamSqlPrimitive.of(SqlTypeName.SMALLINT,
-            (short) roundInt(toInt(leftOp.getValue()), toInt(rightOp.getValue())));
-        break;
-      case TINYINT:
-        result = BeamSqlPrimitive.of(SqlTypeName.TINYINT,
-            (byte) roundInt(toInt(leftOp.getValue()), toInt(rightOp.getValue())));
-        break;
-      case INTEGER:
-        result = BeamSqlPrimitive
-            .of(SqlTypeName.INTEGER, roundInt(leftOp.getInteger(), toInt(rightOp.getValue())));
-        break;
-      case BIGINT:
-        result = BeamSqlPrimitive
-            .of(SqlTypeName.BIGINT, roundLong(leftOp.getLong(), toInt(rightOp.getValue())));
-        break;
-      case DOUBLE:
-        result = BeamSqlPrimitive
-            .of(SqlTypeName.DOUBLE, roundDouble(leftOp.getDouble(), toInt(rightOp.getValue())));
-        break;
-      case FLOAT:
-        result = BeamSqlPrimitive.of(SqlTypeName.FLOAT,
-            (float) roundDouble(leftOp.getFloat(), toInt(rightOp.getValue())));
-        break;
-      case DECIMAL:
-        result = BeamSqlPrimitive.of(SqlTypeName.DECIMAL,
-            roundBigDecimal(toBigDecimal(leftOp.getValue()), toInt(rightOp.getValue())));
-        break;
-      default:
-        break;
-    }
-    return result;
-  }
-
-  private int roundInt(int v1, int v2) {
-    return SqlFunctions.sround(v1, v2);
-  }
-
-  private double roundDouble(double v1, int v2) {
-    return SqlFunctions.sround(v1, v2);
-  }
-
-  private BigDecimal roundBigDecimal(BigDecimal v1, int v2) {
-    return SqlFunctions.sround(v1, v2);
-  }
-
-  private long roundLong(long v1, int v2) {
-    return SqlFunctions.sround(v1, v2);
-  }
-
-  private int toInt(Object value) {
-    return SqlFunctions.toInt(value);
-  }
-
-  private BigDecimal toBigDecimal(Object value) {
-    return SqlFunctions.toBigDecimal(value);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSignExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSignExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSignExpression.java
deleted file mode 100644
index b26ef91..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSignExpression.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.runtime.SqlFunctions;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlMathUnaryExpression} for 'SIGN' function.
- */
-public class BeamSqlSignExpression extends BeamSqlMathUnaryExpression {
-
-  public BeamSqlSignExpression(List<BeamSqlExpression> operands) {
-    super(operands, operands.get(0).getOutputType());
-  }
-
-  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
-    BeamSqlPrimitive result = null;
-    switch (op.getOutputType()) {
-      case TINYINT:
-        result = BeamSqlPrimitive
-          .of(SqlTypeName.TINYINT, (byte) SqlFunctions.sign(SqlFunctions.toByte(op.getValue())));
-        break;
-      case SMALLINT:
-        result = BeamSqlPrimitive
-          .of(SqlTypeName.SMALLINT, (short) SqlFunctions.sign(SqlFunctions.toShort(op.getValue())));
-        break;
-      case INTEGER:
-        result = BeamSqlPrimitive
-            .of(SqlTypeName.INTEGER, SqlFunctions.sign(SqlFunctions.toInt(op.getValue())));
-        break;
-      case BIGINT:
-        result = BeamSqlPrimitive
-            .of(SqlTypeName.BIGINT, SqlFunctions.sign(SqlFunctions.toLong(op.getValue())));
-        break;
-      case FLOAT:
-        result = BeamSqlPrimitive
-            .of(SqlTypeName.FLOAT, (float) SqlFunctions.sign(SqlFunctions.toFloat(op.getValue())));
-        break;
-      case DOUBLE:
-        result = BeamSqlPrimitive
-            .of(SqlTypeName.DOUBLE, SqlFunctions.sign(SqlFunctions.toDouble(op.getValue())));
-        break;
-      case DECIMAL:
-        result = BeamSqlPrimitive
-            .of(SqlTypeName.DECIMAL, SqlFunctions.sign(SqlFunctions.toBigDecimal(op.getValue())));
-        break;
-      default:
-        break;
-    }
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSinExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSinExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSinExpression.java
deleted file mode 100644
index 1b8023e..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlSinExpression.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.runtime.SqlFunctions;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlMathUnaryExpression} for 'SIN' function.
- */
-public class BeamSqlSinExpression extends BeamSqlMathUnaryExpression {
-
-  public BeamSqlSinExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.DOUBLE);
-  }
-
-  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
-    return BeamSqlPrimitive
-        .of(SqlTypeName.DOUBLE, SqlFunctions.sin(SqlFunctions.toDouble(op.getValue())));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTanExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTanExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTanExpression.java
deleted file mode 100644
index c86f8b9..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTanExpression.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.runtime.SqlFunctions;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlMathUnaryExpression} for 'TAN' function.
- */
-public class BeamSqlTanExpression extends BeamSqlMathUnaryExpression {
-
-  public BeamSqlTanExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.DOUBLE);
-  }
-
-  @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive op) {
-    return BeamSqlPrimitive
-        .of(SqlTypeName.DOUBLE, SqlFunctions.tan(SqlFunctions.toDouble(op.getValue())));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTruncateExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTruncateExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTruncateExpression.java
deleted file mode 100644
index 8201360..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/BeamSqlTruncateExpression.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.calcite.runtime.SqlFunctions;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlMathBinaryExpression} for 'TRUNCATE' function.
- */
-public class BeamSqlTruncateExpression extends BeamSqlMathBinaryExpression {
-
-  public BeamSqlTruncateExpression(List<BeamSqlExpression> operands) {
-    super(operands, operands.get(0).getOutputType());
-  }
-
-  @Override public BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp,
-      BeamSqlPrimitive rightOp) {
-    BeamSqlPrimitive result = null;
-    int rightIntOperand = SqlFunctions.toInt(rightOp.getValue());
-    switch (leftOp.getOutputType()) {
-      case SMALLINT:
-        result = BeamSqlPrimitive.of(SqlTypeName.SMALLINT,
-            (short) SqlFunctions.struncate(SqlFunctions.toInt(leftOp.getValue()), rightIntOperand));
-        break;
-      case TINYINT:
-        result = BeamSqlPrimitive.of(SqlTypeName.TINYINT,
-            (byte) SqlFunctions.struncate(SqlFunctions.toInt(leftOp.getValue()), rightIntOperand));
-        break;
-      case INTEGER:
-        result = BeamSqlPrimitive.of(SqlTypeName.INTEGER,
-            SqlFunctions.struncate(SqlFunctions.toInt(leftOp.getValue()), rightIntOperand));
-        break;
-      case BIGINT:
-        result = BeamSqlPrimitive
-            .of(SqlTypeName.BIGINT, SqlFunctions.struncate(leftOp.getLong(), rightIntOperand));
-        break;
-      case FLOAT:
-        result = BeamSqlPrimitive.of(SqlTypeName.FLOAT,
-            (float) SqlFunctions.struncate(SqlFunctions.toFloat(leftOp.getValue()),
-                rightIntOperand));
-        break;
-      case DOUBLE:
-        result = BeamSqlPrimitive.of(SqlTypeName.DOUBLE,
-            SqlFunctions.struncate(SqlFunctions.toDouble(leftOp.getValue()), rightIntOperand));
-        break;
-      case DECIMAL:
-        result = BeamSqlPrimitive
-            .of(SqlTypeName.DECIMAL, SqlFunctions.struncate(leftOp.getDecimal(), rightIntOperand));
-        break;
-      default:
-        break;
-    }
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/package-info.java
deleted file mode 100644
index 09c0780..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/math/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * MATH functions/operators.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.math;

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/package-info.java
deleted file mode 100644
index f913d7f..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Implementation for operators in {@link org.apache.calcite.sql.fun.SqlStdOperatorTable}.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter.operator;

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
deleted file mode 100644
index 44ab804..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * 'CHAR_LENGTH' operator.
- */
-public class BeamSqlCharLengthExpression extends BeamSqlStringUnaryExpression {
-  public BeamSqlCharLengthExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.INTEGER);
-  }
-
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
-    String str = opValueEvaluated(0, inputRow);
-    return BeamSqlPrimitive.of(SqlTypeName.INTEGER, str.length());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlConcatExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlConcatExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlConcatExpression.java
deleted file mode 100644
index bd298fc..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlConcatExpression.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * String concat operator.
- */
-public class BeamSqlConcatExpression extends BeamSqlExpression {
-
-  protected BeamSqlConcatExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
-    super(operands, outputType);
-  }
-
-  public BeamSqlConcatExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.VARCHAR);
-  }
-
-  @Override public boolean accept() {
-    if (operands.size() != 2) {
-      return false;
-    }
-
-    for (BeamSqlExpression exp : getOperands()) {
-      if (!SqlTypeName.CHAR_TYPES.contains(exp.getOutputType())) {
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
-    String left = opValueEvaluated(0, inputRow);
-    String right = opValueEvaluated(1, inputRow);
-
-    return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
-        new StringBuilder(left.length() + right.length())
-            .append(left).append(right).toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlInitCapExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
deleted file mode 100644
index 79cd26f..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * 'INITCAP' operator.
- */
-public class BeamSqlInitCapExpression extends BeamSqlStringUnaryExpression {
-  public BeamSqlInitCapExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.VARCHAR);
-  }
-
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
-    String str = opValueEvaluated(0, inputRow);
-
-    StringBuilder ret = new StringBuilder(str);
-    boolean isInit = true;
-    for (int i = 0; i < str.length(); i++) {
-      if (Character.isWhitespace(str.charAt(i))) {
-        isInit = true;
-        continue;
-      }
-
-      if (isInit) {
-        ret.setCharAt(i, Character.toUpperCase(str.charAt(i)));
-        isInit = false;
-      } else {
-        ret.setCharAt(i, Character.toLowerCase(str.charAt(i)));
-      }
-    }
-    return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, ret.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlLowerExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlLowerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlLowerExpression.java
deleted file mode 100644
index 384c012..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlLowerExpression.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * 'LOWER' operator.
- */
-public class BeamSqlLowerExpression extends BeamSqlStringUnaryExpression {
-  public BeamSqlLowerExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.VARCHAR);
-  }
-
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
-    String str = opValueEvaluated(0, inputRow);
-    return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toLowerCase());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
deleted file mode 100644
index 44e4624..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * 'OVERLAY' operator.
- *
- * <p>
- *   OVERLAY(string1 PLACING string2 FROM integer [ FOR integer2 ])
- * </p>
- */
-public class BeamSqlOverlayExpression extends BeamSqlExpression {
-  public BeamSqlOverlayExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.VARCHAR);
-  }
-
-  @Override public boolean accept() {
-    if (operands.size() < 3 || operands.size() > 4) {
-      return false;
-    }
-
-    if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
-        || !SqlTypeName.CHAR_TYPES.contains(opType(1))
-        || !SqlTypeName.INT_TYPES.contains(opType(2))) {
-      return false;
-    }
-
-    if (operands.size() == 4 && !SqlTypeName.INT_TYPES.contains(opType(3))) {
-      return false;
-    }
-
-    return true;
-  }
-
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
-    String str = opValueEvaluated(0, inputRow);
-    String replaceStr = opValueEvaluated(1, inputRow);
-    int idx = opValueEvaluated(2, inputRow);
-    // the index is 1 based.
-    idx -= 1;
-    int length = replaceStr.length();
-    if (operands.size() == 4) {
-      length = opValueEvaluated(3, inputRow);
-    }
-
-    StringBuilder result = new StringBuilder(
-        str.length() + replaceStr.length() - length);
-    result.append(str.substring(0, idx))
-        .append(replaceStr)
-        .append(str.substring(idx + length));
-
-    return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, result.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpression.java
deleted file mode 100644
index 683902c..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlPositionExpression.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * String position operator.
- *
- * <p>
- *   example:
- *     POSITION(string1 IN string2)
- *     POSITION(string1 IN string2 FROM integer)
- * </p>
- */
-public class BeamSqlPositionExpression extends BeamSqlExpression {
-  public BeamSqlPositionExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.INTEGER);
-  }
-
-  @Override public boolean accept() {
-    if (operands.size() < 2 || operands.size() > 3) {
-      return false;
-    }
-
-    if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
-        || !SqlTypeName.CHAR_TYPES.contains(opType(1))) {
-      return false;
-    }
-
-    if (operands.size() == 3
-        && !SqlTypeName.INT_TYPES.contains(opType(2))) {
-      return false;
-    }
-
-    return true;
-  }
-
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
-    String targetStr = opValueEvaluated(0, inputRow);
-    String containingStr = opValueEvaluated(1, inputRow);
-    int from = -1;
-    if (operands.size() == 3) {
-      Number tmp = opValueEvaluated(2, inputRow);
-      from = tmp.intValue();
-    }
-
-    int idx = containingStr.indexOf(targetStr, from);
-
-    return BeamSqlPrimitive.of(SqlTypeName.INTEGER, idx);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java
deleted file mode 100644
index d6099ab..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlStringUnaryExpression.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Base class for all string unary operators.
- */
-public abstract class BeamSqlStringUnaryExpression extends BeamSqlExpression {
-  public BeamSqlStringUnaryExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
-    super(operands, outputType);
-  }
-
-  @Override public boolean accept() {
-    if (operands.size() != 1) {
-      return false;
-    }
-
-    if (!SqlTypeName.CHAR_TYPES.contains(opType(0))) {
-      return false;
-    }
-
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
deleted file mode 100644
index 759bfa3..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * 'SUBSTRING' operator.
- *
- * <p>
- *   SUBSTRING(string FROM integer)
- *   SUBSTRING(string FROM integer FOR integer)
- * </p>
- */
-public class BeamSqlSubstringExpression extends BeamSqlExpression {
-  public BeamSqlSubstringExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.VARCHAR);
-  }
-
-  @Override public boolean accept() {
-    if (operands.size() < 2 || operands.size() > 3) {
-      return false;
-    }
-
-    if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
-        || !SqlTypeName.INT_TYPES.contains(opType(1))) {
-      return false;
-    }
-
-    if (operands.size() == 3 && !SqlTypeName.INT_TYPES.contains(opType(2))) {
-      return false;
-    }
-
-    return true;
-  }
-
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
-    String str = opValueEvaluated(0, inputRow);
-    int idx = opValueEvaluated(1, inputRow);
-    int startIdx = idx;
-    if (startIdx > 0) {
-      // NOTE: SQL substring is 1 based(rather than 0 based)
-      startIdx -= 1;
-    } else if (startIdx < 0) {
-      // NOTE: SQL also support negative index...
-      startIdx += str.length();
-    } else {
-      return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "");
-    }
-
-    if (operands.size() == 3) {
-      int length = opValueEvaluated(2, inputRow);
-      if (length < 0) {
-        length = 0;
-      }
-      int endIdx = Math.min(startIdx + length, str.length());
-      return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx, endIdx));
-    } else {
-      return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.substring(startIdx));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpression.java
deleted file mode 100644
index 19d411b..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlTrimExpression.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.fun.SqlTrimFunction;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Trim operator.
- *
- * <p>
- * TRIM( { BOTH | LEADING | TRAILING } string1 FROM string2)
- * </p>
- */
-public class BeamSqlTrimExpression extends BeamSqlExpression {
-  public BeamSqlTrimExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.VARCHAR);
-  }
-
-  @Override public boolean accept() {
-    if (operands.size() != 1 && operands.size() != 3) {
-      return false;
-    }
-
-    if (operands.size() == 1 && !SqlTypeName.CHAR_TYPES.contains(opType(0))) {
-      return false;
-    }
-
-    if (operands.size() == 3
-        && (
-        SqlTypeName.SYMBOL != opType(0)
-            || !SqlTypeName.CHAR_TYPES.contains(opType(1))
-            || !SqlTypeName.CHAR_TYPES.contains(opType(2)))
-        ) {
-      return false;
-    }
-
-    return true;
-  }
-
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
-    if (operands.size() == 1) {
-      return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
-          opValueEvaluated(0, inputRow).toString().trim());
-    } else {
-      SqlTrimFunction.Flag type = opValueEvaluated(0, inputRow);
-      String targetStr = opValueEvaluated(1, inputRow);
-      String containingStr = opValueEvaluated(2, inputRow);
-
-      switch (type) {
-        case LEADING:
-          return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, leadingTrim(containingStr, targetStr));
-        case TRAILING:
-          return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, trailingTrim(containingStr, targetStr));
-        case BOTH:
-        default:
-          return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
-              trailingTrim(leadingTrim(containingStr, targetStr), targetStr));
-      }
-    }
-  }
-
-  static String leadingTrim(String containingStr, String targetStr) {
-    int idx = 0;
-    while (containingStr.startsWith(targetStr, idx)) {
-      idx += targetStr.length();
-    }
-
-    return containingStr.substring(idx);
-  }
-
-  static String trailingTrim(String containingStr, String targetStr) {
-    int idx = containingStr.length() - targetStr.length();
-    while (containingStr.startsWith(targetStr, idx)) {
-      idx -= targetStr.length();
-    }
-
-    idx += targetStr.length();
-    return containingStr.substring(0, idx);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpression.java
deleted file mode 100644
index cf27597..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/BeamSqlUpperExpression.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * 'UPPER' operator.
- */
-public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression {
-  public BeamSqlUpperExpression(List<BeamSqlExpression> operands) {
-    super(operands, SqlTypeName.VARCHAR);
-  }
-
-  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
-    String str = opValueEvaluated(0, inputRow);
-    return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/package-info.java
deleted file mode 100644
index 8b55034..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/string/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * String operators.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter.operator.string;

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/package-info.java
deleted file mode 100644
index af3634a..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * interpreter generate runnable 'code' to execute SQL relational expressions.
- */
-package org.apache.beam.sdk.extensions.sql.interpreter;

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamQueryPlanner.java
deleted file mode 100644
index ba6235f..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamQueryPlanner.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.planner;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
-import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode;
-import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.config.Lex;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.RelCollationTraitDef;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperatorTable;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
-import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.Planner;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The core component to handle through a SQL statement, from explain execution plan,
- * to generate a Beam pipeline.
- *
- */
-public class BeamQueryPlanner {
-  private static final Logger LOG = LoggerFactory.getLogger(BeamQueryPlanner.class);
-
-  protected final Planner planner;
-  private Map<String, BaseBeamTable> sourceTables = new HashMap<>();
-
-  public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
-      RelDataTypeSystem.DEFAULT);
-
-  public BeamQueryPlanner(SchemaPlus schema) {
-    final List<RelTraitDef> traitDefs = new ArrayList<>();
-    traitDefs.add(ConventionTraitDef.INSTANCE);
-    traitDefs.add(RelCollationTraitDef.INSTANCE);
-
-    List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
-    sqlOperatorTables.add(SqlStdOperatorTable.instance());
-    sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema), false,
-        Collections.<String>emptyList(), TYPE_FACTORY));
-
-    FrameworkConfig config = Frameworks.newConfigBuilder()
-        .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema)
-        .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets())
-        .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM)
-        .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
-        .build();
-    this.planner = Frameworks.getPlanner(config);
-
-    for (String t : schema.getTableNames()) {
-      sourceTables.put(t, (BaseBeamTable) schema.getTable(t));
-    }
-  }
-
-  /**
-   * Parse input SQL query, and return a {@link SqlNode} as grammar tree.
-   */
-  public SqlNode parseQuery(String sqlQuery) throws SqlParseException{
-    return planner.parse(sqlQuery);
-  }
-
-  /**
-   * {@code compileBeamPipeline} translate a SQL statement to executed as Beam data flow,
-   * which is linked with the given {@code pipeline}. The final output stream is returned as
-   * {@code PCollection} so more operations can be applied.
-   */
-  public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline
-      , BeamSqlEnv sqlEnv) throws Exception {
-    BeamRelNode relNode = convertToBeamRel(sqlStatement);
-
-    // the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel.
-    return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline), sqlEnv);
-  }
-
-  /**
-   * It parses and validate the input query, then convert into a
-   * {@link BeamRelNode} tree.
-   *
-   */
-  public BeamRelNode convertToBeamRel(String sqlStatement)
-      throws ValidationException, RelConversionException, SqlParseException {
-    BeamRelNode beamRelNode;
-    try {
-      beamRelNode = (BeamRelNode) validateAndConvert(planner.parse(sqlStatement));
-    } finally {
-      planner.close();
-    }
-    return beamRelNode;
-  }
-
-  private RelNode validateAndConvert(SqlNode sqlNode)
-      throws ValidationException, RelConversionException {
-    SqlNode validated = validateNode(sqlNode);
-    LOG.info("SQL:\n" + validated);
-    RelNode relNode = convertToRelNode(validated);
-    return convertToBeamRel(relNode);
-  }
-
-  private RelNode convertToBeamRel(RelNode relNode) throws RelConversionException {
-    RelTraitSet traitSet = relNode.getTraitSet();
-
-    LOG.info("SQLPlan>\n" + RelOptUtil.toString(relNode));
-
-    // PlannerImpl.transform() optimizes RelNode with ruleset
-    return planner.transform(0, traitSet.plus(BeamLogicalConvention.INSTANCE), relNode);
-  }
-
-  private RelNode convertToRelNode(SqlNode sqlNode) throws RelConversionException {
-    return planner.rel(sqlNode).rel;
-  }
-
-  private SqlNode validateNode(SqlNode sqlNode) throws ValidationException {
-    return planner.validate(sqlNode);
-  }
-
-  public Map<String, BaseBeamTable> getSourceTables() {
-    return sourceTables;
-  }
-
-  public Planner getPlanner() {
-    return planner;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRelDataTypeSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRelDataTypeSystem.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRelDataTypeSystem.java
deleted file mode 100644
index fba4638..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRelDataTypeSystem.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.planner;
-
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
-
-/**
- * customized data type in Beam.
- *
- */
-public class BeamRelDataTypeSystem extends RelDataTypeSystemImpl {
-  public static final RelDataTypeSystem BEAM_REL_DATATYPE_SYSTEM = new BeamRelDataTypeSystem();
-
-  @Override
-  public int getMaxNumericScale() {
-    return 38;
-  }
-
-  @Override
-  public int getMaxNumericPrecision() {
-    return 38;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRuleSets.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRuleSets.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRuleSets.java
deleted file mode 100644
index e907321..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/BeamRuleSets.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.planner;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import java.util.Iterator;
-import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode;
-import org.apache.beam.sdk.extensions.sql.rule.BeamAggregationRule;
-import org.apache.beam.sdk.extensions.sql.rule.BeamFilterRule;
-import org.apache.beam.sdk.extensions.sql.rule.BeamIOSinkRule;
-import org.apache.beam.sdk.extensions.sql.rule.BeamIOSourceRule;
-import org.apache.beam.sdk.extensions.sql.rule.BeamIntersectRule;
-import org.apache.beam.sdk.extensions.sql.rule.BeamJoinRule;
-import org.apache.beam.sdk.extensions.sql.rule.BeamMinusRule;
-import org.apache.beam.sdk.extensions.sql.rule.BeamProjectRule;
-import org.apache.beam.sdk.extensions.sql.rule.BeamSortRule;
-import org.apache.beam.sdk.extensions.sql.rule.BeamUnionRule;
-import org.apache.beam.sdk.extensions.sql.rule.BeamValuesRule;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.tools.RuleSet;
-
-/**
- * {@link RuleSet} used in {@link BeamQueryPlanner}. It translates a standard
- * Calcite {@link RelNode} tree, to represent with {@link BeamRelNode}
- *
- */
-public class BeamRuleSets {
-  private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = ImmutableSet
-      .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE,
-          BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE,
-          BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE,
-          BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, BeamUnionRule.INSTANCE,
-          BeamJoinRule.INSTANCE)
-      .build();
-
-  public static RuleSet[] getRuleSets() {
-    return new RuleSet[] { new BeamRuleSet(
-        ImmutableSet.<RelOptRule>builder().addAll(calciteToBeamConversionRules).build()) };
-  }
-
-  private static class BeamRuleSet implements RuleSet {
-    final ImmutableSet<RelOptRule> rules;
-
-    public BeamRuleSet(ImmutableSet<RelOptRule> rules) {
-      this.rules = rules;
-    }
-
-    public BeamRuleSet(ImmutableList<RelOptRule> rules) {
-      this.rules = ImmutableSet.<RelOptRule>builder().addAll(rules).build();
-    }
-
-    @Override
-    public Iterator<RelOptRule> iterator() {
-      return rules.iterator();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/package-info.java
deleted file mode 100644
index 680ccbd..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/planner/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * {@link org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner} is the main interface.
- * It defines data sources, validate a SQL statement, and convert it as a Beam
- * pipeline.
- */
-package org.apache.beam.sdk.extensions.sql.planner;

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamAggregationRel.java
deleted file mode 100644
index 66ab892..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamAggregationRel.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.extensions.sql.transform.BeamAggregationTransforms;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.WithTimestamps;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.linq4j.Ord;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.Util;
-import org.joda.time.Duration;
-
-/**
- * {@link BeamRelNode} to replace a {@link Aggregate} node.
- *
- */
-public class BeamAggregationRel extends Aggregate implements BeamRelNode {
-  private int windowFieldIdx = -1;
-  private WindowFn<BeamSqlRow, BoundedWindow> windowFn;
-  private Trigger trigger;
-  private Duration allowedLatence = Duration.ZERO;
-
-  public BeamAggregationRel(RelOptCluster cluster, RelTraitSet traits
-      , RelNode child, boolean indicator,
-      ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls
-      , WindowFn windowFn, Trigger trigger, int windowFieldIdx, Duration allowedLatence) {
-    super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
-    this.windowFn = windowFn;
-    this.trigger = trigger;
-    this.windowFieldIdx = windowFieldIdx;
-    this.allowedLatence = allowedLatence;
-  }
-
-  @Override
-  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    RelNode input = getInput();
-    String stageName = BeamSqlRelUtils.getStageName(this) + "_";
-
-    PCollection<BeamSqlRow> upstream =
-        BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
-    if (windowFieldIdx != -1) {
-      upstream = upstream.apply(stageName + "assignEventTimestamp", WithTimestamps
-          .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)))
-          .setCoder(upstream.getCoder());
-    }
-
-    PCollection<BeamSqlRow> windowStream = upstream.apply(stageName + "window",
-        Window.into(windowFn)
-        .triggering(trigger)
-        .withAllowedLateness(allowedLatence)
-        .accumulatingFiredPanes());
-
-    BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType()));
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> exCombineByStream = windowStream.apply(
-        stageName + "exCombineBy",
-        WithKeys
-            .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(
-                windowFieldIdx, groupSet)))
-        .setCoder(KvCoder.of(keyCoder, upstream.getCoder()));
-
-
-    BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema());
-
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = exCombineByStream.apply(
-        stageName + "combineBy",
-        Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey(
-            new BeamAggregationTransforms.AggregationAdaptor(getAggCallList(),
-                CalciteUtils.toBeamRowType(input.getRowType()))))
-        .setCoder(KvCoder.of(keyCoder, aggCoder));
-
-    PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "mergeRecord",
-        ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
-            CalciteUtils.toBeamRowType(getRowType()), getAggCallList(), windowFieldIdx)));
-    mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
-
-    return mergedStream;
-  }
-
-  /**
-   * Type of sub-rowrecord used as Group-By keys.
-   */
-  private BeamSqlRowType exKeyFieldsSchema(RelDataType relDataType) {
-    BeamSqlRowType inputRowType = CalciteUtils.toBeamRowType(relDataType);
-    List<String> fieldNames = new ArrayList<>();
-    List<Integer> fieldTypes = new ArrayList<>();
-    for (int i : groupSet.asList()) {
-      if (i != windowFieldIdx) {
-        fieldNames.add(inputRowType.getFieldsName().get(i));
-        fieldTypes.add(inputRowType.getFieldsType().get(i));
-      }
-    }
-    return BeamSqlRowType.create(fieldNames, fieldTypes);
-  }
-
-  /**
-   * Type of sub-rowrecord, that represents the list of aggregation fields.
-   */
-  private BeamSqlRowType exAggFieldsSchema() {
-    List<String> fieldNames = new ArrayList<>();
-    List<Integer> fieldTypes = new ArrayList<>();
-    for (AggregateCall ac : getAggCallList()) {
-      fieldNames.add(ac.name);
-      fieldTypes.add(CalciteUtils.toJavaType(ac.type.getSqlTypeName()));
-    }
-
-    return BeamSqlRowType.create(fieldNames, fieldTypes);
-  }
-
-  @Override
-  public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator
-      , ImmutableBitSet groupSet,
-      List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
-    return new BeamAggregationRel(getCluster(), traitSet, input, indicator
-        , groupSet, groupSets, aggCalls, windowFn, trigger, windowFieldIdx, allowedLatence);
-  }
-
-  public void setWindowFn(WindowFn windowFn) {
-    this.windowFn = windowFn;
-  }
-
-  public void setTrigger(Trigger trigger) {
-    this.trigger = trigger;
-  }
-
-  public RelWriter explainTerms(RelWriter pw) {
-    // We skip the "groups" element if it is a singleton of "group".
-    pw.item("group", groupSet)
-        .itemIf("window", windowFn, windowFn != null)
-        .itemIf("trigger", trigger, trigger != null)
-        .itemIf("event_time", windowFieldIdx, windowFieldIdx != -1)
-        .itemIf("groups", groupSets, getGroupType() != Group.SIMPLE)
-        .itemIf("indicator", indicator, indicator)
-        .itemIf("aggs", aggCalls, pw.nest());
-    if (!pw.nest()) {
-      for (Ord<AggregateCall> ord : Ord.zip(aggCalls)) {
-        pw.item(Util.first(ord.e.name, "agg#" + ord.i), ord.e);
-      }
-    }
-    return pw;
-  }
-
-}