You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/04/12 20:01:26 UTC
[1/3] beam git commit: [BEAM-301] Initial skeleton for Beam SQL
Repository: beam
Updated Branches:
refs/heads/DSL_SQL 3625dbd9e -> f1c2b6540
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java
new file mode 100644
index 0000000..3816063
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.Schema.TableType;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
+import org.beam.dsls.sql.planner.BeamQueryPlanner;
+
+/**
+ * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
+ */
+public abstract class BaseBeamTable implements ScannableTable, Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -1262988061830914193L;
+ private RelDataType relDataType;
+
+ protected BeamSQLRecordType beamSqlRecordType;
+
+ public BaseBeamTable(RelProtoDataType protoRowType) {
+ this.relDataType = protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY);
+ this.beamSqlRecordType = BeamSQLRecordType.from(relDataType);
+ }
+
+ /**
+ * In Beam SQL, there's no difference between a batch query and a streaming
+ * query. {@link BeamIOType} is used to validate the sources.
+ */
+ public abstract BeamIOType getSourceType();
+
+ /**
+ * create a {@code IO.read()} instance to read from source.
+ *
+ */
+ public abstract PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader();
+
+ /**
+ * create a {@code IO.write()} instance to write to target.
+ *
+ */
+ public abstract PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter();
+
+ @Override
+ public Enumerable<Object[]> scan(DataContext root) {
+ // not used as Beam SQL uses its own execution engine
+ return null;
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ return relDataType;
+ }
+
+ /**
+ * Not used {@link Statistic} to optimize the plan.
+ */
+ @Override
+ public Statistic getStatistic() {
+ return Statistics.UNKNOWN;
+ }
+
+ /**
+ * all sources are treated as TABLE in Beam SQL.
+ */
+ @Override
+ public TableType getJdbcTableType() {
+ return TableType.TABLE;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java
new file mode 100644
index 0000000..5e55b0f
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema;
+
+import java.io.Serializable;
+
+/**
+ * Type as a source IO, determined whether it's a STREAMING process, or batch
+ * process.
+ */
+public enum BeamIOType implements Serializable {
+ BOUNDED, UNBOUNDED;
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java
new file mode 100644
index 0000000..dc8e381
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Field type information in {@link BeamSQLRow}.
+ *
+ */
+//@DefaultCoder(BeamSQLRecordTypeCoder.class)
+public class BeamSQLRecordType implements Serializable {
+ /**
+ *
+ */
+ private static final long serialVersionUID = -5318734648766104712L;
+ private List<String> fieldsName = new ArrayList<>();
+ private List<SqlTypeName> fieldsType = new ArrayList<>();
+
+ public static BeamSQLRecordType from(RelDataType tableInfo) {
+ BeamSQLRecordType record = new BeamSQLRecordType();
+ for (RelDataTypeField f : tableInfo.getFieldList()) {
+ record.fieldsName.add(f.getName());
+ record.fieldsType.add(f.getType().getSqlTypeName());
+ }
+ return record;
+ }
+
+ public int size() {
+ return fieldsName.size();
+ }
+
+ public List<String> getFieldsName() {
+ return fieldsName;
+ }
+
+ public void setFieldsName(List<String> fieldsName) {
+ this.fieldsName = fieldsName;
+ }
+
+ public List<SqlTypeName> getFieldsType() {
+ return fieldsType;
+ }
+
+ public void setFieldsType(List<SqlTypeName> fieldsType) {
+ this.fieldsType = fieldsType;
+ }
+
+ @Override
+ public String toString() {
+ return "RecordType [fieldsName=" + fieldsName + ", fieldsType=" + fieldsType + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
new file mode 100644
index 0000000..2989cb9
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * A {@link Coder} for {@link BeamSQLRecordType}.
+ *
+ */
+public class BeamSQLRecordTypeCoder extends StandardCoder<BeamSQLRecordType> {
+ private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
+ private static final VarIntCoder intCoder = VarIntCoder.of();
+
+ private static final BeamSQLRecordTypeCoder INSTANCE = new BeamSQLRecordTypeCoder();
+ private BeamSQLRecordTypeCoder(){}
+
+ public static BeamSQLRecordTypeCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(BeamSQLRecordType value, OutputStream outStream,
+ org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException {
+ Context nested = context.nested();
+ intCoder.encode(value.size(), outStream, nested);
+ for(String fieldName : value.getFieldsName()){
+ stringCoder.encode(fieldName, outStream, nested);
+ }
+ for(SqlTypeName fieldType : value.getFieldsType()){
+ stringCoder.encode(fieldType.name(), outStream, nested);
+ }
+ outStream.flush();
+ }
+
+ @Override
+ public BeamSQLRecordType decode(InputStream inStream,
+ org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException {
+ BeamSQLRecordType typeRecord = new BeamSQLRecordType();
+ Context nested = context.nested();
+ int size = intCoder.decode(inStream, nested);
+ for(int idx=0; idx<size; ++idx){
+ typeRecord.getFieldsName().add(stringCoder.decode(inStream, nested));
+ }
+ for(int idx=0; idx<size; ++idx){
+ typeRecord.getFieldsType().add(SqlTypeName.valueOf(stringCoder.decode(inStream, nested)));
+ }
+ return typeRecord;
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void verifyDeterministic()
+ throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java
new file mode 100644
index 0000000..db93168
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Repersent a generic ROW record in Beam SQL.
+ *
+ */
+public class BeamSQLRow implements Serializable {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 4569220242480160895L;
+
+ private List<Integer> nullFields = new ArrayList<>();
+ private List<Object> dataValues;
+ private BeamSQLRecordType dataType;
+
+ public BeamSQLRow(BeamSQLRecordType dataType) {
+ this.dataType = dataType;
+ this.dataValues = new ArrayList<>();
+ for(int idx=0; idx<dataType.size(); ++idx){
+ dataValues.add(null);
+ }
+ }
+
+ public BeamSQLRow(BeamSQLRecordType dataType, List<Object> dataValues) {
+ this.dataValues = dataValues;
+ this.dataType = dataType;
+ }
+
+ public void addField(String fieldName, Object fieldValue) {
+ addField(dataType.getFieldsName().indexOf(fieldName), fieldValue);
+ }
+
+ public void addField(int index, Object fieldValue) {
+ if(fieldValue == null){
+ dataValues.set(index, fieldValue);
+ if(!nullFields.contains(index)){nullFields.add(index);}
+ return;
+ }
+
+ SqlTypeName fieldType = dataType.getFieldsType().get(index);
+ switch (fieldType) {
+ case INTEGER:
+ case SMALLINT:
+ case TINYINT:
+ if(!(fieldValue instanceof Integer)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ case DOUBLE:
+ if(!(fieldValue instanceof Double)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ case BIGINT:
+ if(!(fieldValue instanceof Long)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ case FLOAT:
+ if(!(fieldValue instanceof Float)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ case VARCHAR:
+ if(!(fieldValue instanceof String)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ case TIME:
+ case TIMESTAMP:
+ if(!(fieldValue instanceof Date)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ default:
+ throw new UnsupportedDataTypeException(fieldType);
+ }
+ dataValues.set(index, fieldValue);
+ }
+
+
+ public int getInteger(int idx) {
+ return (Integer) getFieldValue(idx);
+ }
+
+ public double getDouble(int idx) {
+ return (Double) getFieldValue(idx);
+ }
+
+ public long getLong(int idx) {
+ return (Long) getFieldValue(idx);
+ }
+
+ public String getString(int idx) {
+ return (String) getFieldValue(idx);
+ }
+
+ public Date getDate(int idx) {
+ return (Date) getFieldValue(idx);
+ }
+
+ public Object getFieldValue(String fieldName) {
+ return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
+ }
+
+ public Object getFieldValue(int fieldIdx) {
+ if(nullFields.contains(fieldIdx)){
+ return null;
+ }
+
+ Object fieldValue = dataValues.get(fieldIdx);
+ SqlTypeName fieldType = dataType.getFieldsType().get(fieldIdx);
+
+ switch (fieldType) {
+ case INTEGER:
+ case SMALLINT:
+ case TINYINT:
+ if(!(fieldValue instanceof Integer)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }else{
+ return Integer.valueOf(fieldValue.toString());
+ }
+ case DOUBLE:
+ if(!(fieldValue instanceof Double)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }else{
+ return Double.valueOf(fieldValue.toString());
+ }
+ case BIGINT:
+ if(!(fieldValue instanceof Long)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }else{
+ return Long.valueOf(fieldValue.toString());
+ }
+ case FLOAT:
+ if(!(fieldValue instanceof Float)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }else{
+ return Float.valueOf(fieldValue.toString());
+ }
+ case VARCHAR:
+ if(!(fieldValue instanceof String)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }else{
+ return fieldValue.toString();
+ }
+ case TIME:
+ case TIMESTAMP:
+ if(!(fieldValue instanceof Date)){
+ throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }else{
+ return fieldValue;
+ }
+ default:
+ throw new UnsupportedDataTypeException(fieldType);
+ }
+ }
+
+ public int size() {
+ return dataValues.size();
+ }
+
+ public List<Object> getDataValues() {
+ return dataValues;
+ }
+
+ public void setDataValues(List<Object> dataValues) {
+ this.dataValues = dataValues;
+ }
+
+ public BeamSQLRecordType getDataType() {
+ return dataType;
+ }
+
+ public void setDataType(BeamSQLRecordType dataType) {
+ this.dataType = dataType;
+ }
+
+ public void setNullFields(List<Integer> nullFields) {
+ this.nullFields = nullFields;
+ }
+
+ public List<Integer> getNullFields() {
+ return nullFields;
+ }
+
+ @Override
+ public String toString() {
+ return "BeamSQLRow [dataValues=" + dataValues + ", dataType=" + dataType + "]";
+ }
+
+ /**
+ * Return data fields as key=value.
+ */
+ public String valueInString() {
+ StringBuffer sb = new StringBuffer();
+ for (int idx = 0; idx < size(); ++idx) {
+ sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), getFieldValue(idx)));
+ }
+ return sb.substring(1);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ BeamSQLRow other = (BeamSQLRow) obj;
+ return toString().equals(other.toString());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java
new file mode 100644
index 0000000..00af18d
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Date;
+import java.util.List;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+/**
+ * A {@link Coder} encodes {@link BeamSQLRow}.
+ *
+ */
+public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
+ private static final BeamSQLRecordTypeCoder recordTypeCoder = BeamSQLRecordTypeCoder.of();
+
+ private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of());
+
+ private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
+ private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of();
+ private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
+ private static final DoubleCoder doubleCoder = DoubleCoder.of();
+
+ private static final BeamSqlRowCoder INSTANCE = new BeamSqlRowCoder();
+ private BeamSqlRowCoder(){}
+
+ public static BeamSqlRowCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(BeamSQLRow value, OutputStream outStream,
+ org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException {
+ recordTypeCoder.encode(value.getDataType(), outStream, context);
+ listCoder.encode(value.getNullFields(), outStream, context);
+
+ Context nested = context.nested();
+
+ for (int idx = 0; idx < value.size(); ++idx) {
+ if(value.getNullFields().contains(idx)){
+ continue;
+ }
+
+ switch (value.getDataType().getFieldsType().get(idx)) {
+ case INTEGER:
+ case SMALLINT:
+ case TINYINT:
+ intCoder.encode(value.getInteger(idx), outStream, nested);
+ break;
+ case DOUBLE:
+ case FLOAT:
+ doubleCoder.encode(value.getDouble(idx), outStream, nested);
+ break;
+ case BIGINT:
+ longCoder.encode(value.getLong(idx), outStream, nested);
+ break;
+ case VARCHAR:
+ stringCoder.encode(value.getString(idx), outStream, nested);
+ break;
+ case TIME:
+ case TIMESTAMP:
+ longCoder.encode(value.getDate(idx).getTime(), outStream, nested);
+ break;
+
+ default:
+ throw new UnsupportedDataTypeException(value.getDataType().getFieldsType().get(idx));
+ }
+ }
+ }
+
+ @Override
+ public BeamSQLRow decode(InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {
+ BeamSQLRecordType type = recordTypeCoder.decode(inStream, context);
+ List<Integer> nullFields = listCoder.decode(inStream, context);
+
+ BeamSQLRow record = new BeamSQLRow(type);
+ record.setNullFields(nullFields);
+
+ for (int idx = 0; idx < type.size(); ++idx) {
+ if(nullFields.contains(idx)){
+ continue;
+ }
+
+ switch (type.getFieldsType().get(idx)) {
+ case INTEGER:
+ case SMALLINT:
+ case TINYINT:
+ record.addField(idx, intCoder.decode(inStream, context));
+ break;
+ case DOUBLE:
+ case FLOAT:
+ record.addField(idx, doubleCoder.decode(inStream, context));
+ break;
+ case BIGINT:
+ record.addField(idx, longCoder.decode(inStream, context));
+ break;
+ case VARCHAR:
+ record.addField(idx, stringCoder.decode(inStream, context));
+ break;
+ case TIME:
+ case TIMESTAMP:
+ record.addField(idx, new Date(longCoder.decode(inStream, context)));
+ break;
+
+ default:
+ throw new UnsupportedDataTypeException(type.getFieldsType().get(idx));
+ }
+ }
+
+ return record;
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return null;
+ }
+
+ @Override
+ public void verifyDeterministic()
+ throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java
new file mode 100644
index 0000000..6240426
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema;
+
+public class InvalidFieldException extends RuntimeException {
+
+ public InvalidFieldException() {
+ super();
+ }
+
+ public InvalidFieldException(String message) {
+ super(message);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java
new file mode 100644
index 0000000..9a2235e
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema;
+
+import org.apache.calcite.sql.type.SqlTypeName;
+
+public class UnsupportedDataTypeException extends RuntimeException {
+
+ public UnsupportedDataTypeException(SqlTypeName unsupportedType){
+ super(String.format("Not support data type [%s]", unsupportedType));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
new file mode 100644
index 0000000..2570763
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema.kafka;
+
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Kafka topic that saves records as CSV format.
+ *
+ */
+public class BeamKafkaCSVTable extends BeamKafkaTable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 4754022536543333984L;
+
+ public static final String DELIMITER = ",";
+ private static final Logger LOG = LoggerFactory.getLogger(BeamKafkaCSVTable.class);
+
+ public BeamKafkaCSVTable(RelProtoDataType protoRowType, String bootstrapServers,
+ List<String> topics) {
+ super(protoRowType, bootstrapServers, topics);
+ }
+
+ @Override
+ public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>>
+ getPTransformForInput() {
+ return new CsvRecorderDecoder(beamSqlRecordType);
+ }
+
+ @Override
+ public PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>>
+ getPTransformForOutput() {
+ return new CsvRecorderEncoder(beamSqlRecordType);
+ }
+
+ /**
+ * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamSQLRow}.
+ *
+ */
+ public static class CsvRecorderDecoder
+ extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>> {
+ private BeamSQLRecordType recordType;
+
+ public CsvRecorderDecoder(BeamSQLRecordType recordType) {
+ this.recordType = recordType;
+ }
+
+ @Override
+ public PCollection<BeamSQLRow> expand(PCollection<KV<byte[], byte[]>> input) {
+ return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamSQLRow>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ String rowInString = new String(c.element().getValue());
+ String[] parts = rowInString.split(BeamKafkaCSVTable.DELIMITER);
+ if (parts.length != recordType.size()) {
+ LOG.error(String.format("invalid record: ", rowInString));
+ } else {
+ BeamSQLRow sourceRecord = new BeamSQLRow(recordType);
+ for (int idx = 0; idx < parts.length; ++idx) {
+ sourceRecord.addField(idx, parts[idx]);
+ }
+ c.output(sourceRecord);
+ }
+ }
+ }));
+ }
+ }
+
+ /**
+ * A PTransform to convert {@link BeamSQLRow} to {@code KV<byte[], byte[]>}.
+ *
+ */
+ public static class CsvRecorderEncoder
+ extends PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>> {
+ private BeamSQLRecordType recordType;
+
+ public CsvRecorderEncoder(BeamSQLRecordType recordType) {
+ this.recordType = recordType;
+ }
+
+ @Override
+ public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamSQLRow> input) {
+ return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSQLRow, KV<byte[], byte[]>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ BeamSQLRow in = c.element();
+ StringBuffer sb = new StringBuffer();
+ for (int idx = 0; idx < in.size(); ++idx) {
+ sb.append(DELIMITER);
+ sb.append(in.getFieldValue(idx).toString());
+ }
+ c.output(KV.of(new byte[] {}, sb.substring(1).getBytes()));
+ }
+ }));
+
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
new file mode 100644
index 0000000..482383b
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.schema.kafka;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.beam.dsls.sql.schema.BaseBeamTable;
+import org.beam.dsls.sql.schema.BeamIOType;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+
+/**
+ * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to
+ * extend to convert between {@code BeamSQLRow} and {@code KV<byte[], byte[]>}.
+ *
+ */
+public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -634715473399906527L;
+
+ private String bootstrapServers;
+ private List<String> topics;
+ private Map<String, Object> configUpdates;
+
+ protected BeamKafkaTable(RelProtoDataType protoRowType) {
+ super(protoRowType);
+ }
+
+ public BeamKafkaTable(RelProtoDataType protoRowType, String bootstrapServers,
+ List<String> topics) {
+ super(protoRowType);
+ this.bootstrapServers = bootstrapServers;
+ this.topics = topics;
+ }
+
+ public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) {
+ this.configUpdates = configUpdates;
+ return this;
+ }
+
+ @Override
+ public BeamIOType getSourceType() {
+ return BeamIOType.UNBOUNDED;
+ }
+
+ public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>>
+ getPTransformForInput();
+
+ public abstract PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>>
+ getPTransformForOutput();
+
+ @Override
+ public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() {
+ return new PTransform<PBegin, PCollection<BeamSQLRow>>() {
+
+ @Override
+ public PCollection<BeamSQLRow> expand(PBegin input) {
+ return input.apply("read",
+ KafkaIO.<byte[], byte[]>read().withBootstrapServers(bootstrapServers).withTopics(topics)
+ .updateConsumerProperties(configUpdates).withKeyCoder(ByteArrayCoder.of())
+ .withValueCoder(ByteArrayCoder.of()).withoutMetadata())
+ .apply("in_format", getPTransformForInput());
+
+ }
+ };
+ }
+
+ @Override
+ public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() {
+ checkArgument(topics != null && topics.size() == 1,
+ "Only one topic can be acceptable as output.");
+
+ return new PTransform<PCollection<BeamSQLRow>, PDone>() {
+ @Override
+ public PDone expand(PCollection<BeamSQLRow> input) {
+ return input.apply("out_reformat", getPTransformForOutput()).apply("persistent",
+ KafkaIO.<byte[], byte[]>write().withBootstrapServers(bootstrapServers)
+ .withTopic(topics.get(0)).withKeyCoder(ByteArrayCoder.of())
+ .withValueCoder(ByteArrayCoder.of()));
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java
new file mode 100644
index 0000000..822fce7
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * table schema for KafkaIO.
+ */
+package org.beam.dsls.sql.schema.kafka;
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java
new file mode 100644
index 0000000..ef9cc7d
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * define table schema, to map with Beam IO components.
+ *
+ */
+package org.beam.dsls.sql.schema;
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java
new file mode 100644
index 0000000..06db280
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.transform;
+
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
+import org.beam.dsls.sql.rel.BeamFilterRel;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+
+/**
+ * {@code BeamSQLFilterFn} is the executor for a {@link BeamFilterRel} step.
+ *
+ */
+public class BeamSQLFilterFn extends DoFn<BeamSQLRow, BeamSQLRow> {
+ /**
+ *
+ */
+ private static final long serialVersionUID = -1256111753670606705L;
+
+ private String stepName;
+ private BeamSQLExpressionExecutor executor;
+
+ public BeamSQLFilterFn(String stepName, BeamSQLExpressionExecutor executor) {
+ super();
+ this.stepName = stepName;
+ this.executor = executor;
+ }
+
+ @Setup
+ public void setup() {
+ executor.prepare();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ BeamSQLRow in = c.element();
+
+ List<Object> result = executor.execute(in);
+
+ if ((Boolean) result.get(0)) {
+ c.output(in);
+ }
+ }
+
+ @Teardown
+ public void close() {
+ executor.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
new file mode 100644
index 0000000..1014c0d
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.transform;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+
+/**
+ * A test PTransform to display output in console.
+ *
+ */
+public class BeamSQLOutputToConsoleFn extends DoFn<BeamSQLRow, Void> {
+ /**
+ *
+ */
+ private static final long serialVersionUID = -1256111753670606705L;
+
+ private String stepName;
+
+ public BeamSQLOutputToConsoleFn(String stepName) {
+ super();
+ this.stepName = stepName;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ System.out.println("Output: " + c.element().getDataValues());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java
new file mode 100644
index 0000000..12061d2
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.transform;
+
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
+import org.beam.dsls.sql.rel.BeamProjectRel;
+import org.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+
+/**
+ *
+ * {@code BeamSQLProjectFn} is the executor for a {@link BeamProjectRel} step.
+ *
+ */
+public class BeamSQLProjectFn extends DoFn<BeamSQLRow, BeamSQLRow> {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -1046605249999014608L;
+ private String stepName;
+ private BeamSQLExpressionExecutor executor;
+ private BeamSQLRecordType outputRecordType;
+
+ public BeamSQLProjectFn(String stepName, BeamSQLExpressionExecutor executor,
+ BeamSQLRecordType outputRecordType) {
+ super();
+ this.stepName = stepName;
+ this.executor = executor;
+ this.outputRecordType = outputRecordType;
+ }
+
+ @Setup
+ public void setup() {
+ executor.prepare();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ List<Object> results = executor.execute(c.element());
+
+ BeamSQLRow outRow = new BeamSQLRow(outputRecordType);
+ for (int idx = 0; idx < results.size(); ++idx) {
+ outRow.addField(idx, results.get(idx));
+ }
+
+ c.output(outRow);
+ }
+
+ @Teardown
+ public void close() {
+ executor.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java
new file mode 100644
index 0000000..2607abf
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * {@link org.apache.beam.sdk.transforms.PTransform} used in a BeamSQL pipeline.
+ */
+package org.beam.dsls.sql.transform;
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/resources/log4j.properties b/dsls/sql/src/main/resources/log4j.properties
new file mode 100644
index 0000000..709484b
--- /dev/null
+++ b/dsls/sql/src/main/resources/log4j.properties
@@ -0,0 +1,23 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=ERROR,console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java
new file mode 100644
index 0000000..56e45c4
--- /dev/null
+++ b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BasePlanner.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.planner;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.beam.dsls.sql.schema.BaseBeamTable;
+import org.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable;
+import org.junit.BeforeClass;
+
+/**
+ * prepare {@code BeamSqlRunner} for test.
+ *
+ */
+public class BasePlanner {
+ public static BeamSqlRunner runner = new BeamSqlRunner();
+
+ @BeforeClass
+ public static void prepare() {
+ runner.addTable("ORDER_DETAILS", getTable());
+ runner.addTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
+ runner.addTable("SUB_ORDER_RAM", getTable());
+ }
+
+ private static BaseBeamTable getTable() {
+ final RelProtoDataType protoRowType = new RelProtoDataType() {
+ @Override
+ public RelDataType apply(RelDataTypeFactory a0) {
+ return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER)
+ .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build();
+ }
+ };
+
+ return new MockedBeamSQLTable(protoRowType);
+ }
+
+ public static BaseBeamTable getTable(String bootstrapServer, String topic) {
+ final RelProtoDataType protoRowType = new RelProtoDataType() {
+ @Override
+ public RelDataType apply(RelDataTypeFactory a0) {
+ return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER)
+ .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build();
+ }
+ };
+
+ Map<String, Object> consumerPara = new HashMap<String, Object>();
+ consumerPara.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+
+ return new BeamKafkaCSVTable(protoRowType, bootstrapServer, Arrays.asList(topic))
+ .updateConsumerProperties(consumerPara);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java
new file mode 100644
index 0000000..a77878f
--- /dev/null
+++ b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerExplainTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.planner;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests to explain queries.
+ *
+ */
+public class BeamPlannerExplainTest extends BasePlanner {
+
+ @Test
+ public void selectAll() throws Exception {
+ String sql = "SELECT * FROM ORDER_DETAILS";
+ String plan = runner.explainQuery(sql);
+
+ String expectedPlan =
+ "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[$3])\n"
+ + " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
+ Assert.assertEquals("explain doesn't match", expectedPlan, plan);
+ }
+
+ @Test
+ public void selectWithFilter() throws Exception {
+ String sql = "SELECT " + " order_id, site_id, price " + "FROM ORDER_DETAILS "
+ + "WHERE SITE_ID = 0 and price > 20";
+ String plan = runner.explainQuery(sql);
+
+ String expectedPlan = "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n"
+ + " BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n"
+ + " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
+ Assert.assertEquals("explain doesn't match", expectedPlan, plan);
+ }
+
+ @Test
+ public void insertSelectFilter() throws Exception {
+ String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT "
+ + " order_id, site_id, price " + "FROM ORDER_DETAILS "
+ + "WHERE SITE_ID = 0 and price > 20";
+ String plan = runner.explainQuery(sql);
+
+ String expectedPlan =
+ "BeamIOSinkRel(table=[[SUB_ORDER]], operation=[INSERT], flattened=[true])\n"
+ + " BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[null])\n"
+ + " BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n"
+ + " BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n"
+ + " BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
+ Assert.assertEquals("explain doesn't match", expectedPlan, plan);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
new file mode 100644
index 0000000..eb097a9
--- /dev/null
+++ b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.planner;
+
+import org.apache.beam.sdk.Pipeline;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests to execute a query.
+ *
+ */
+public class BeamPlannerSubmitTest extends BasePlanner {
+ @Test
+ public void insertSelectFilter() throws Exception {
+ String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
+ + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20";
+ Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+ runner.getPlanner().planner.close();
+
+ pipeline.run().waitUntilFinish();
+
+ Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1);
+ Assert.assertEquals("order_id=12345,site_id=0,price=20.5,order_time=null", MockedBeamSQLTable.CONTENT.get(0));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java
new file mode 100644
index 0000000..31f5578
--- /dev/null
+++ b/dsls/sql/src/test/java/org/beam/dsls/sql/planner/MockedBeamSQLTable.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.planner;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.beam.dsls.sql.schema.BaseBeamTable;
+import org.beam.dsls.sql.schema.BeamIOType;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+
+/**
+ * A mock table use to check input/output.
+ *
+ */
+public class MockedBeamSQLTable extends BaseBeamTable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1373168368414036932L;
+
+ public static final List<String> CONTENT = new ArrayList<>();
+
+ public MockedBeamSQLTable(RelProtoDataType protoRowType) {
+ super(protoRowType);
+ }
+
+ @Override
+ public BeamIOType getSourceType() {
+ return BeamIOType.UNBOUNDED;
+ }
+
+ @Override
+ public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() {
+ BeamSQLRow row1 = new BeamSQLRow(beamSqlRecordType);
+ row1.addField(0, 12345L);
+ row1.addField(1, 0);
+ row1.addField(2, 10.5);
+ row1.addField(3, new Date());
+
+ BeamSQLRow row2 = new BeamSQLRow(beamSqlRecordType);
+ row2.addField(0, 12345L);
+ row2.addField(1, 1);
+ row2.addField(2, 20.5);
+ row2.addField(3, new Date());
+
+ BeamSQLRow row3 = new BeamSQLRow(beamSqlRecordType);
+ row3.addField(0, 12345L);
+ row3.addField(1, 0);
+ row3.addField(2, 20.5);
+ row3.addField(3, new Date());
+
+ BeamSQLRow row4 = new BeamSQLRow(beamSqlRecordType);
+ row4.addField(0, null);
+ row4.addField(1, null);
+ row4.addField(2, 20.5);
+ row4.addField(3, new Date());
+
+ return Create.of(row1, row2, row3);
+ }
+
+ @Override
+ public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() {
+ return new OutputStore();
+ }
+
+ /**
+ * Keep output in {@code CONTENT} for validation.
+ *
+ */
+ public static class OutputStore extends PTransform<PCollection<BeamSQLRow>, PDone> {
+
+ @Override
+ public PDone expand(PCollection<BeamSQLRow> input) {
+ input.apply(ParDo.of(new DoFn<BeamSQLRow, Void>() {
+
+ @Setup
+ public void setup() {
+ CONTENT.clear();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ CONTENT.add(c.element().valueInString());
+ }
+
+ @Teardown
+ public void close() {
+
+ }
+
+ }));
+ return PDone.in(input.getPipeline());
+ }
+
+ }
+
+}
[3/3] beam git commit: [BEAM-301] This closes #2479
Posted by jb...@apache.org.
[BEAM-301] This closes #2479
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f1c2b654
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f1c2b654
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f1c2b654
Branch: refs/heads/DSL_SQL
Commit: f1c2b6540652156ad7c51ec89a933aacb8c6dab6
Parents: 3625dbd 7867ce6
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Wed Apr 12 22:00:53 2017 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Apr 12 22:00:53 2017 +0200
----------------------------------------------------------------------
dsls/pom.xml | 5 +-
dsls/sql/README.md | 24 ++
dsls/sql/pom.xml | 166 +++++++++++++
.../beam/dsls/sql/example/BeamSqlExample.java | 102 ++++++++
.../org/beam/dsls/sql/example/package-info.java | 23 ++
.../interpreter/BeamSQLExpressionExecutor.java | 43 ++++
.../sql/interpreter/BeamSQLSpELExecutor.java | 126 ++++++++++
.../dsls/sql/interpreter/CalciteToSpEL.java | 80 ++++++
.../beam/dsls/sql/interpreter/package-info.java | 22 ++
.../java/org/beam/dsls/sql/package-info.java | 22 ++
.../dsls/sql/planner/BeamPipelineCreator.java | 85 +++++++
.../beam/dsls/sql/planner/BeamQueryPlanner.java | 157 ++++++++++++
.../dsls/sql/planner/BeamRelDataTypeSystem.java | 40 +++
.../org/beam/dsls/sql/planner/BeamRuleSets.java | 65 +++++
.../beam/dsls/sql/planner/BeamSQLRelUtils.java | 73 ++++++
.../beam/dsls/sql/planner/BeamSqlRunner.java | 93 +++++++
.../planner/BeamSqlUnsupportedException.java | 38 +++
.../planner/UnsupportedOperatorsVisitor.java | 28 +++
.../org/beam/dsls/sql/planner/package-info.java | 24 ++
.../org/beam/dsls/sql/rel/BeamFilterRel.java | 71 ++++++
.../org/beam/dsls/sql/rel/BeamIOSinkRel.java | 75 ++++++
.../org/beam/dsls/sql/rel/BeamIOSourceRel.java | 59 +++++
.../dsls/sql/rel/BeamLogicalConvention.java | 72 ++++++
.../org/beam/dsls/sql/rel/BeamProjectRel.java | 82 +++++++
.../java/org/beam/dsls/sql/rel/BeamRelNode.java | 38 +++
.../org/beam/dsls/sql/rel/package-info.java | 23 ++
.../org/beam/dsls/sql/rule/BeamFilterRule.java | 49 ++++
.../org/beam/dsls/sql/rule/BeamIOSinkRule.java | 81 +++++++
.../beam/dsls/sql/rule/BeamIOSourceRule.java | 49 ++++
.../org/beam/dsls/sql/rule/BeamProjectRule.java | 50 ++++
.../org/beam/dsls/sql/rule/package-info.java | 22 ++
.../org/beam/dsls/sql/schema/BaseBeamTable.java | 99 ++++++++
.../org/beam/dsls/sql/schema/BeamIOType.java | 28 +++
.../beam/dsls/sql/schema/BeamSQLRecordType.java | 74 ++++++
.../dsls/sql/schema/BeamSQLRecordTypeCoder.java | 88 +++++++
.../org/beam/dsls/sql/schema/BeamSQLRow.java | 242 +++++++++++++++++++
.../beam/dsls/sql/schema/BeamSqlRowCoder.java | 149 ++++++++++++
.../dsls/sql/schema/InvalidFieldException.java | 30 +++
.../schema/UnsupportedDataTypeException.java | 28 +++
.../sql/schema/kafka/BeamKafkaCSVTable.java | 127 ++++++++++
.../dsls/sql/schema/kafka/BeamKafkaTable.java | 111 +++++++++
.../dsls/sql/schema/kafka/package-info.java | 22 ++
.../org/beam/dsls/sql/schema/package-info.java | 23 ++
.../dsls/sql/transform/BeamSQLFilterFn.java | 66 +++++
.../sql/transform/BeamSQLOutputToConsoleFn.java | 45 ++++
.../dsls/sql/transform/BeamSQLProjectFn.java | 72 ++++++
.../beam/dsls/sql/transform/package-info.java | 22 ++
dsls/sql/src/main/resources/log4j.properties | 23 ++
.../org/beam/dsls/sql/planner/BasePlanner.java | 74 ++++++
.../sql/planner/BeamPlannerExplainTest.java | 68 ++++++
.../dsls/sql/planner/BeamPlannerSubmitTest.java | 42 ++++
.../dsls/sql/planner/MockedBeamSQLTable.java | 123 ++++++++++
52 files changed, 3441 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: [BEAM-301] Initial skeleton for Beam SQL
Posted by jb...@apache.org.
[BEAM-301] Initial skeleton for Beam SQL
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7867ce62
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7867ce62
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7867ce62
Branch: refs/heads/DSL_SQL
Commit: 7867ce62e43bef7bfd8011c605379df05494dfcf
Parents: 3625dbd
Author: mingmxu <mi...@ebay.com>
Authored: Sun Apr 9 19:49:08 2017 -0700
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Apr 12 21:59:46 2017 +0200
----------------------------------------------------------------------
dsls/pom.xml | 5 +-
dsls/sql/README.md | 24 ++
dsls/sql/pom.xml | 166 +++++++++++++
.../beam/dsls/sql/example/BeamSqlExample.java | 102 ++++++++
.../org/beam/dsls/sql/example/package-info.java | 23 ++
.../interpreter/BeamSQLExpressionExecutor.java | 43 ++++
.../sql/interpreter/BeamSQLSpELExecutor.java | 126 ++++++++++
.../dsls/sql/interpreter/CalciteToSpEL.java | 80 ++++++
.../beam/dsls/sql/interpreter/package-info.java | 22 ++
.../java/org/beam/dsls/sql/package-info.java | 22 ++
.../dsls/sql/planner/BeamPipelineCreator.java | 85 +++++++
.../beam/dsls/sql/planner/BeamQueryPlanner.java | 157 ++++++++++++
.../dsls/sql/planner/BeamRelDataTypeSystem.java | 40 +++
.../org/beam/dsls/sql/planner/BeamRuleSets.java | 65 +++++
.../beam/dsls/sql/planner/BeamSQLRelUtils.java | 73 ++++++
.../beam/dsls/sql/planner/BeamSqlRunner.java | 93 +++++++
.../planner/BeamSqlUnsupportedException.java | 38 +++
.../planner/UnsupportedOperatorsVisitor.java | 28 +++
.../org/beam/dsls/sql/planner/package-info.java | 24 ++
.../org/beam/dsls/sql/rel/BeamFilterRel.java | 71 ++++++
.../org/beam/dsls/sql/rel/BeamIOSinkRel.java | 75 ++++++
.../org/beam/dsls/sql/rel/BeamIOSourceRel.java | 59 +++++
.../dsls/sql/rel/BeamLogicalConvention.java | 72 ++++++
.../org/beam/dsls/sql/rel/BeamProjectRel.java | 82 +++++++
.../java/org/beam/dsls/sql/rel/BeamRelNode.java | 38 +++
.../org/beam/dsls/sql/rel/package-info.java | 23 ++
.../org/beam/dsls/sql/rule/BeamFilterRule.java | 49 ++++
.../org/beam/dsls/sql/rule/BeamIOSinkRule.java | 81 +++++++
.../beam/dsls/sql/rule/BeamIOSourceRule.java | 49 ++++
.../org/beam/dsls/sql/rule/BeamProjectRule.java | 50 ++++
.../org/beam/dsls/sql/rule/package-info.java | 22 ++
.../org/beam/dsls/sql/schema/BaseBeamTable.java | 99 ++++++++
.../org/beam/dsls/sql/schema/BeamIOType.java | 28 +++
.../beam/dsls/sql/schema/BeamSQLRecordType.java | 74 ++++++
.../dsls/sql/schema/BeamSQLRecordTypeCoder.java | 88 +++++++
.../org/beam/dsls/sql/schema/BeamSQLRow.java | 242 +++++++++++++++++++
.../beam/dsls/sql/schema/BeamSqlRowCoder.java | 149 ++++++++++++
.../dsls/sql/schema/InvalidFieldException.java | 30 +++
.../schema/UnsupportedDataTypeException.java | 28 +++
.../sql/schema/kafka/BeamKafkaCSVTable.java | 127 ++++++++++
.../dsls/sql/schema/kafka/BeamKafkaTable.java | 111 +++++++++
.../dsls/sql/schema/kafka/package-info.java | 22 ++
.../org/beam/dsls/sql/schema/package-info.java | 23 ++
.../dsls/sql/transform/BeamSQLFilterFn.java | 66 +++++
.../sql/transform/BeamSQLOutputToConsoleFn.java | 45 ++++
.../dsls/sql/transform/BeamSQLProjectFn.java | 72 ++++++
.../beam/dsls/sql/transform/package-info.java | 22 ++
dsls/sql/src/main/resources/log4j.properties | 23 ++
.../org/beam/dsls/sql/planner/BasePlanner.java | 74 ++++++
.../sql/planner/BeamPlannerExplainTest.java | 68 ++++++
.../dsls/sql/planner/BeamPlannerSubmitTest.java | 42 ++++
.../dsls/sql/planner/MockedBeamSQLTable.java | 123 ++++++++++
52 files changed, 3441 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/pom.xml
----------------------------------------------------------------------
diff --git a/dsls/pom.xml b/dsls/pom.xml
index 6e00171..a1bb0ee 100644
--- a/dsls/pom.xml
+++ b/dsls/pom.xml
@@ -27,10 +27,11 @@
</parent>
<artifactId>beam-dsls-parent</artifactId>
+ <packaging>pom</packaging>
<name>Apache Beam :: DSLs</name>
<modules>
- <!-- <module>sql</module> -->
+ <module>sql</module>
</modules>
<build>
@@ -53,4 +54,4 @@
</pluginManagement>
</build>
-</project>
\ No newline at end of file
+</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/README.md
----------------------------------------------------------------------
diff --git a/dsls/sql/README.md b/dsls/sql/README.md
new file mode 100644
index 0000000..ae9e0f3
--- /dev/null
+++ b/dsls/sql/README.md
@@ -0,0 +1,24 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Beam SQL
+
+Beam SQL provides a new interface, to execute a SQL query as a Beam pipeline.
+
+*It's working in progress...*
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/pom.xml
----------------------------------------------------------------------
diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml
new file mode 100644
index 0000000..21c8def
--- /dev/null
+++ b/dsls/sql/pom.xml
@@ -0,0 +1,166 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-dsls-parent</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>beam-dsls-sql</artifactId>
+ <name>Apache Beam :: DSLs :: SQL</name>
+ <description>Beam SQL provides a new interface to generate a Beam pipeline from SQL statement</description>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <timestamp>${maven.build.timestamp}</timestamp>
+ <maven.build.timestamp.format>yyyy-MM-dd HH:mm</maven.build.timestamp.format>
+ <calcite-version>1.11.0</calcite-version>
+ </properties>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <!-- Set testSourceDirectory in order to exclude generated-test-sources -->
+ <testSourceDirectory>${project.basedir}/src/test/</testSourceDirectory>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <argLine>-da</argLine> <!-- disable assert in Calcite converter validation -->
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>bundle-and-repackage</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadeTestJar>true</shadeTestJar>
+ <artifactSet>
+ <includes>
+ <include>com.google.guava:guava</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Coverage analysis for unit tests. -->
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ <version>${calcite-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-kafka</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-expression</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-linq4j</artifactId>
+ <version>${calcite-version}</version>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/beam/dsls/sql/example/BeamSqlExample.java
new file mode 100644
index 0000000..7fb8def
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/example/BeamSqlExample.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.example;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.beam.dsls.sql.planner.BeamSqlRunner;
+import org.beam.dsls.sql.schema.BaseBeamTable;
+import org.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable;
+
+/**
+ * This is one quick example.
+ * <p>Before start, follow https://kafka.apache.org/quickstart to setup a Kafka
+ * cluster locally, and run below commands to create required Kafka topics:
+ * <pre>
+ * <code>
+ * bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 \
+ * --partitions 1 --topic orders
+ * bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 \
+ * --partitions 1 --topic sub_orders
+ * </code>
+ * </pre>
+ * After run the application, produce several test records:
+ * <pre>
+ * <code>
+ * bin/kafka-console-producer.sh --broker-list localhost:9092 --topic orders
+ * invalid,record
+ * 123445,0,100,3413423
+ * 234123,3,232,3451231234
+ * 234234,0,5,1234123
+ * 345234,0,345234.345,3423
+ * </code>
+ * </pre>
+ * Meanwhile, open another console to see the output:
+ * <pre>
+ * <code>
+ * bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sub_orders
+ * **Expected :
+ * 123445,0,100.0
+ * 345234,0,345234.345
+ * </code>
+ * </pre>
+ */
+public class BeamSqlExample implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 3673487843555563904L;
+
+ public static void main(String[] args) throws Exception {
+ BeamSqlRunner runner = new BeamSqlRunner();
+ runner.addTable("ORDER_DETAILS", getTable("127.0.0.1:9092", "orders"));
+ runner.addTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
+
+ // case 2: insert into <table>(<fields>) select STREAM <fields> from
+ // <table> from <clause>
+ String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT "
+ + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20";
+
+ runner.explainQuery(sql);
+ runner.submitQuery(sql);
+ }
+
+ public static BaseBeamTable getTable(String bootstrapServer, String topic) {
+ final RelProtoDataType protoRowType = new RelProtoDataType() {
+ @Override
+ public RelDataType apply(RelDataTypeFactory a0) {
+ return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER)
+ .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build();
+ }
+ };
+
+ Map<String, Object> consumerPara = new HashMap<String, Object>();
+ consumerPara.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+
+ return new BeamKafkaCSVTable(protoRowType, bootstrapServer, Arrays.asList(topic))
+ .updateConsumerProperties(consumerPara);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/example/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/example/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/example/package-info.java
new file mode 100644
index 0000000..ae678e4
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/example/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * examples on how to use BeamSQL.
+ *
+ */
+package org.beam.dsls.sql.example;
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java b/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java
new file mode 100644
index 0000000..e9d425d
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.interpreter;
+
+import java.io.Serializable;
+import java.util.List;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+
+/**
+ * {@code BeamSQLExpressionExecutor} fills the gap between relational
+ * expressions in Calcite SQL and executable code.
+ *
+ */
+public interface BeamSQLExpressionExecutor extends Serializable {
+
+ /**
+ * invoked before data processing.
+ */
+ void prepare();
+
+ /**
+ * apply transformation to input record {@link BeamSQLRow}.
+ *
+ */
+ List<Object> execute(BeamSQLRow inputRecord);
+
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java b/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java
new file mode 100644
index 0000000..48306da
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/BeamSQLSpELExecutor.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.interpreter;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.beam.dsls.sql.planner.BeamSqlUnsupportedException;
+import org.beam.dsls.sql.rel.BeamFilterRel;
+import org.beam.dsls.sql.rel.BeamProjectRel;
+import org.beam.dsls.sql.rel.BeamRelNode;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+import org.springframework.expression.Expression;
+import org.springframework.expression.ExpressionParser;
+import org.springframework.expression.spel.SpelParserConfiguration;
+import org.springframework.expression.spel.standard.SpelExpressionParser;
+import org.springframework.expression.spel.support.StandardEvaluationContext;
+
+/**
+ * {@code BeamSQLSpELExecutor} is one implementation, to convert Calcite SQL
+ * relational expression to SpEL expression.
+ *
+ */
+public class BeamSQLSpELExecutor implements BeamSQLExpressionExecutor {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 6777232573390074408L;
+
+ private List<String> spelString;
+ private List<Expression> spelExpressions;
+
+ public BeamSQLSpELExecutor(BeamRelNode relNode) {
+ this.spelString = new ArrayList<>();
+ if (relNode instanceof BeamFilterRel) {
+ String filterSpEL = CalciteToSpEL
+ .rexcall2SpEL((RexCall) ((BeamFilterRel) relNode).getCondition());
+ spelString.add(filterSpEL);
+ } else if (relNode instanceof BeamProjectRel) {
+ spelString.addAll(createProjectExps((BeamProjectRel) relNode));
+ // List<ProjectRule> projectRules =
+ // for (int idx = 0; idx < projectRules.size(); ++idx) {
+ // spelString.add(projectRules.get(idx).getProjectExp());
+ // }
+ } else {
+ throw new BeamSqlUnsupportedException(
+ String.format("%s is not supported yet", relNode.getClass().toString()));
+ }
+ }
+
+ @Override
+ public void prepare() {
+ this.spelExpressions = new ArrayList<>();
+
+ SpelParserConfiguration config = new SpelParserConfiguration(true, true);
+ ExpressionParser parser = new SpelExpressionParser(config);
+ for (String el : spelString) {
+ spelExpressions.add(parser.parseExpression(el));
+ }
+ }
+
+ @Override
+ public List<Object> execute(BeamSQLRow inputRecord) {
+ StandardEvaluationContext inContext = new StandardEvaluationContext();
+ inContext.setVariable("in", inputRecord);
+
+ List<Object> results = new ArrayList<>();
+ for (Expression ep : spelExpressions) {
+ results.add(ep.getValue(inContext));
+ }
+ return results;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ private List<String> createProjectExps(BeamProjectRel projectRel) {
+ List<String> rules = new ArrayList<>();
+
+ List<RexNode> exps = projectRel.getProjects();
+
+ for (int idx = 0; idx < exps.size(); ++idx) {
+ RexNode node = exps.get(idx);
+ if (node == null) {
+ rules.add("null");
+ }
+
+ if (node instanceof RexLiteral) {
+ rules.add(((RexLiteral) node).getValue() + "");
+ } else {
+ if (node instanceof RexInputRef) {
+ rules.add("#in.getFieldValue(" + ((RexInputRef) node).getIndex() + ")");
+ }
+ if (node instanceof RexCall) {
+ rules.add(CalciteToSpEL.rexcall2SpEL((RexCall) node));
+ }
+ }
+ }
+
+ checkArgument(rules.size() == exps.size(), "missing projects rules after conversion.");
+
+ return rules;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/CalciteToSpEL.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/CalciteToSpEL.java b/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/CalciteToSpEL.java
new file mode 100644
index 0000000..c7cbace
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/CalciteToSpEL.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.interpreter;
+
+import com.google.common.base.Joiner;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.beam.dsls.sql.planner.BeamSqlUnsupportedException;
+
+/**
+ * {@code CalciteToSpEL} is used in {@link BeamSQLSpELExecutor}, to convert a
+ * relational expression {@link RexCall} to SpEL expression.
+ *
+ */
+public class CalciteToSpEL {
+
+ public static String rexcall2SpEL(RexCall cdn) {
+ List<String> parts = new ArrayList<>();
+ for (RexNode subcdn : cdn.operands) {
+ if (subcdn instanceof RexCall) {
+ parts.add(rexcall2SpEL((RexCall) subcdn));
+ } else {
+ parts.add(subcdn instanceof RexInputRef
+ ? "#in.getFieldValue(" + ((RexInputRef) subcdn).getIndex() + ")" : subcdn.toString());
+ }
+ }
+
+ String opName = cdn.op.getName();
+ switch (cdn.op.getClass().getSimpleName()) {
+ case "SqlMonotonicBinaryOperator": // +-*
+ case "SqlBinaryOperator": // > < = >= <= <> OR AND || / .
+ switch (cdn.op.getName().toUpperCase()) {
+ case "AND":
+ return String.format(" ( %s ) ", Joiner.on("&&").join(parts));
+ case "OR":
+ return String.format(" ( %s ) ", Joiner.on("||").join(parts));
+ case "=":
+ return String.format(" ( %s ) ", Joiner.on("==").join(parts));
+ case "<>":
+ return String.format(" ( %s ) ", Joiner.on("!=").join(parts));
+ default:
+ return String.format(" ( %s ) ", Joiner.on(cdn.op.getName().toUpperCase()).join(parts));
+ }
+ case "SqlCaseOperator": // CASE
+ return String.format(" (%s ? %s : %s)", parts.get(0), parts.get(1), parts.get(2));
+ case "SqlCastFunction": // CAST
+ return parts.get(0);
+ case "SqlPostfixOperator":
+ switch (opName.toUpperCase()) {
+ case "IS NULL":
+ return String.format(" null == %s ", parts.get(0));
+ case "IS NOT NULL":
+ return String.format(" null != %s ", parts.get(0));
+ default:
+ throw new BeamSqlUnsupportedException();
+ }
+ default:
+ throw new BeamSqlUnsupportedException();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/package-info.java
new file mode 100644
index 0000000..85235e2
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/interpreter/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * interpreter generate runnable 'code' to execute SQL relational expressions.
+ */
+package org.beam.dsls.sql.interpreter;
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/package-info.java
new file mode 100644
index 0000000..c6f5cf6
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * BeamSQL provides a new interface to run a SQL statement with Beam.
+ */
+package org.beam.dsls.sql;
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamPipelineCreator.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamPipelineCreator.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamPipelineCreator.java
new file mode 100644
index 0000000..5a0c73d
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamPipelineCreator.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.planner;
+
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.values.PCollection;
+import org.beam.dsls.sql.rel.BeamRelNode;
+import org.beam.dsls.sql.schema.BaseBeamTable;
+import org.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.beam.dsls.sql.schema.BeamSQLRecordTypeCoder;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+import org.beam.dsls.sql.schema.BeamSqlRowCoder;
+
+/**
+ * {@link BeamPipelineCreator} converts a {@link BeamRelNode} tree, into a Beam
+ * pipeline.
+ *
+ */
+public class BeamPipelineCreator {
+ private Map<String, BaseBeamTable> sourceTables;
+ private PCollection<BeamSQLRow> latestStream;
+
+ private PipelineOptions options;
+
+ private Pipeline pipeline;
+
+ private boolean hasPersistent = false;
+
+ public BeamPipelineCreator(Map<String, BaseBeamTable> sourceTables) {
+ this.sourceTables = sourceTables;
+
+ options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
+ .as(PipelineOptions.class); // FlinkPipelineOptions.class
+ options.setJobName("BeamPlanCreator");
+
+ pipeline = Pipeline.create(options);
+ CoderRegistry cr = pipeline.getCoderRegistry();
+ cr.registerCoder(BeamSQLRow.class, BeamSqlRowCoder.of());
+ cr.registerCoder(BeamSQLRecordType.class, BeamSQLRecordTypeCoder.of());
+ }
+
+ public PCollection<BeamSQLRow> getLatestStream() {
+ return latestStream;
+ }
+
+ public void setLatestStream(PCollection<BeamSQLRow> latestStream) {
+ this.latestStream = latestStream;
+ }
+
+ public Map<String, BaseBeamTable> getSourceTables() {
+ return sourceTables;
+ }
+
+ public Pipeline getPipeline() {
+ return pipeline;
+ }
+
+ public boolean isHasPersistent() {
+ return hasPersistent;
+ }
+
+ public void setHasPersistent(boolean hasPersistent) {
+ this.hasPersistent = hasPersistent;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamQueryPlanner.java
new file mode 100644
index 0000000..a31ace0
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamQueryPlanner.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.planner;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.beam.dsls.sql.rel.BeamRelNode;
+import org.beam.dsls.sql.schema.BaseBeamTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The core component to handle through a SQL statement, to submit a Beam
+ * pipeline.
+ *
+ */
+public class BeamQueryPlanner {
+ private static final Logger LOG = LoggerFactory.getLogger(BeamQueryPlanner.class);
+
+ protected final Planner planner;
+ private Map<String, BaseBeamTable> sourceTables = new HashMap<>();
+
+ public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
+ RelDataTypeSystem.DEFAULT);
+
+ public BeamQueryPlanner(SchemaPlus schema) {
+ final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
+ traitDefs.add(ConventionTraitDef.INSTANCE);
+ traitDefs.add(RelCollationTraitDef.INSTANCE);
+
+ List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
+ sqlOperatorTables.add(SqlStdOperatorTable.instance());
+ sqlOperatorTables.add(new CalciteCatalogReader(CalciteSchema.from(schema), false,
+ Collections.<String>emptyList(), TYPE_FACTORY));
+
+ FrameworkConfig config = Frameworks.newConfigBuilder()
+ .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema)
+ .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets())
+ .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM).build();
+ this.planner = Frameworks.getPlanner(config);
+
+ for (String t : schema.getTableNames()) {
+ sourceTables.put(t, (BaseBeamTable) schema.getTable(t));
+ }
+ }
+
+ /**
+ * With a Beam pipeline generated in {@link #compileBeamPipeline(String)},
+ * submit it to run and wait until finish.
+ *
+ */
+ public void submitToRun(String sqlStatement) throws Exception {
+ Pipeline pipeline = compileBeamPipeline(sqlStatement);
+
+ PipelineResult result = pipeline.run();
+ result.waitUntilFinish();
+ }
+
+ /**
+ * With the @{@link BeamRelNode} tree generated in
+ * {@link #convertToBeamRel(String)}, a Beam pipeline is generated.
+ *
+ */
+ public Pipeline compileBeamPipeline(String sqlStatement) throws Exception {
+ BeamRelNode relNode = convertToBeamRel(sqlStatement);
+
+ BeamPipelineCreator planCreator = new BeamPipelineCreator(sourceTables);
+ return relNode.buildBeamPipeline(planCreator);
+ }
+
+ /**
+ * It parses and validate the input query, then convert into a
+ * {@link BeamRelNode} tree.
+ *
+ */
+ public BeamRelNode convertToBeamRel(String sqlStatement)
+ throws ValidationException, RelConversionException, SqlParseException {
+ return (BeamRelNode) validateAndConvert(planner.parse(sqlStatement));
+ }
+
+ private RelNode validateAndConvert(SqlNode sqlNode)
+ throws ValidationException, RelConversionException {
+ SqlNode validated = validateNode(sqlNode);
+ LOG.info("SQL:\n" + validated);
+ RelNode relNode = convertToRelNode(validated);
+ return convertToBeamRel(relNode);
+ }
+
+ private RelNode convertToBeamRel(RelNode relNode) throws RelConversionException {
+ RelTraitSet traitSet = relNode.getTraitSet();
+
+ LOG.info("SQLPlan>\n" + RelOptUtil.toString(relNode));
+
+ // PlannerImpl.transform() optimizes RelNode with ruleset
+ return planner.transform(0, traitSet.plus(BeamLogicalConvention.INSTANCE), relNode);
+ }
+
+ private RelNode convertToRelNode(SqlNode sqlNode) throws RelConversionException {
+ return planner.rel(sqlNode).rel;
+ }
+
+ private SqlNode validateNode(SqlNode sqlNode) throws ValidationException {
+ SqlNode validatedSqlNode = planner.validate(sqlNode);
+ validatedSqlNode.accept(new UnsupportedOperatorsVisitor());
+ return validatedSqlNode;
+ }
+
+ public Map<String, BaseBeamTable> getSourceTables() {
+ return sourceTables;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRelDataTypeSystem.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRelDataTypeSystem.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRelDataTypeSystem.java
new file mode 100644
index 0000000..bf35296
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRelDataTypeSystem.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.planner;
+
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
+
+/**
+ * customized data type in Beam.
+ *
+ */
+public class BeamRelDataTypeSystem extends RelDataTypeSystemImpl {
+ public static final RelDataTypeSystem BEAM_REL_DATATYPE_SYSTEM = new BeamRelDataTypeSystem();
+
+ @Override
+ public int getMaxNumericScale() {
+ return 38;
+ }
+
+ @Override
+ public int getMaxNumericPrecision() {
+ return 38;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRuleSets.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRuleSets.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRuleSets.java
new file mode 100644
index 0000000..3f40c27
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamRuleSets.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.planner;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import java.util.Iterator;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.tools.RuleSet;
+import org.beam.dsls.sql.rel.BeamRelNode;
+import org.beam.dsls.sql.rule.BeamFilterRule;
+import org.beam.dsls.sql.rule.BeamIOSinkRule;
+import org.beam.dsls.sql.rule.BeamIOSourceRule;
+import org.beam.dsls.sql.rule.BeamProjectRule;
+
+/**
+ * {@link RuleSet} used in {@link BeamQueryPlanner}. It translates a standard
+ * Calcite {@link RelNode} tree, to represent with {@link BeamRelNode}
+ *
+ */
+public class BeamRuleSets {
+ private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = ImmutableSet
+ .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE,
+ BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE)
+ .build();
+
+ public static RuleSet[] getRuleSets() {
+ return new RuleSet[] { new BeamRuleSet(
+ ImmutableSet.<RelOptRule>builder().addAll(calciteToBeamConversionRules).build()) };
+ }
+
+ private static class BeamRuleSet implements RuleSet {
+ final ImmutableSet<RelOptRule> rules;
+
+ public BeamRuleSet(ImmutableSet<RelOptRule> rules) {
+ this.rules = rules;
+ }
+
+ public BeamRuleSet(ImmutableList<RelOptRule> rules) {
+ this.rules = ImmutableSet.<RelOptRule>builder().addAll(rules).build();
+ }
+
+ @Override
+ public Iterator<RelOptRule> iterator() {
+ return rules.iterator();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSQLRelUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSQLRelUtils.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSQLRelUtils.java
new file mode 100644
index 0000000..94b341c
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSQLRelUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.planner;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.beam.dsls.sql.rel.BeamRelNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utilities for {@code BeamRelNode}.
+ */
+public class BeamSQLRelUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(BeamSQLRelUtils.class);
+
+ private static final AtomicInteger sequence = new AtomicInteger(0);
+ private static final AtomicInteger classSequence = new AtomicInteger(0);
+
+ public static String getStageName(BeamRelNode relNode) {
+ return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_"
+ + sequence.getAndIncrement();
+ }
+
+ public static String getClassName(BeamRelNode relNode) {
+ return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId()
+ + "_" + classSequence.getAndIncrement();
+ }
+
+ public static BeamRelNode getBeamRelInput(RelNode input) {
+ if (input instanceof RelSubset) {
+ // go with known best input
+ input = ((RelSubset) input).getBest();
+ }
+ return (BeamRelNode) input;
+ }
+
+ public static String explain(final RelNode rel) {
+ return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
+ }
+
+ public static String explain(final RelNode rel, SqlExplainLevel detailLevel) {
+ String explain = "";
+ try {
+ explain = RelOptUtil.toString(rel);
+ } catch (StackOverflowError e) {
+ LOG.error("StackOverflowError occurred while extracting plan. "
+ + "Please report it to the dev@ mailing list.");
+ LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e);
+ LOG.error("Forcing plan to empty string and continue... "
+ + "SQL Runner may not working properly after.");
+ }
+ return explain;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlRunner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlRunner.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlRunner.java
new file mode 100644
index 0000000..9581fcd
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlRunner.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.planner;
+
+import java.io.Serializable;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.beam.dsls.sql.rel.BeamRelNode;
+import org.beam.dsls.sql.schema.BaseBeamTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Interface to explain, submit a SQL query.
+ *
+ */
+public class BeamSqlRunner implements Serializable {
+ /**
+ *
+ */
+ private static final long serialVersionUID = -4708693435115005182L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(BeamSqlRunner.class);
+
+ private SchemaPlus schema = Frameworks.createRootSchema(true);
+
+ private BeamQueryPlanner planner = new BeamQueryPlanner(schema);
+
+ /**
+ * Add a schema.
+ *
+ */
+ public void addSchema(String schemaName, Schema scheme) {
+ schema.add(schemaName, schema);
+ }
+
+ /**
+ * add a {@link BaseBeamTable} to schema repository.
+ *
+ */
+ public void addTable(String tableName, BaseBeamTable table) {
+ schema.add(tableName, table);
+ planner.getSourceTables().put(tableName, table);
+ }
+
+ /**
+ * submit as a Beam pipeline.
+ *
+ */
+ public void submitQuery(String sqlString) throws Exception {
+ planner.submitToRun(sqlString);
+ planner.planner.close();
+ }
+
+ /**
+ * explain and display the execution plan.
+ *
+ */
+ public String explainQuery(String sqlString)
+ throws ValidationException, RelConversionException, SqlParseException {
+ BeamRelNode exeTree = planner.convertToBeamRel(sqlString);
+ String beamPlan = RelOptUtil.toString(exeTree);
+ System.out.println(String.format("beamPlan>\n%s", beamPlan));
+
+ planner.planner.close();
+ return beamPlan;
+ }
+
+ protected BeamQueryPlanner getPlanner() {
+ return planner;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlUnsupportedException.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlUnsupportedException.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlUnsupportedException.java
new file mode 100644
index 0000000..a3475bb
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/BeamSqlUnsupportedException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.planner;
+
+/**
+ * Generic exception for un-supported operations.
+ *
+ */
+public class BeamSqlUnsupportedException extends RuntimeException {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 3445015747629217342L;
+
+ public BeamSqlUnsupportedException(String string) {
+ super(string);
+ }
+
+ public BeamSqlUnsupportedException() {
+ super();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java
new file mode 100644
index 0000000..702381d
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.planner;
+
+import org.apache.calcite.sql.util.SqlShuttle;
+
+/**
+ * Unsupported operation to visit a RelNode.
+ *
+ */
+public class UnsupportedOperatorsVisitor extends SqlShuttle {
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/planner/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/package-info.java
new file mode 100644
index 0000000..d98c584
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * {@link org.beam.dsls.sql.planner.BeamQueryPlanner} is the main interface.
+ * It defines data sources, validate a SQL statement, and convert it as a Beam
+ * pipeline.
+ */
+package org.beam.dsls.sql.planner;
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamFilterRel.java
new file mode 100644
index 0000000..64f2d1f
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamFilterRel.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.rel;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rex.RexNode;
+import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
+import org.beam.dsls.sql.interpreter.BeamSQLSpELExecutor;
+import org.beam.dsls.sql.planner.BeamPipelineCreator;
+import org.beam.dsls.sql.planner.BeamSQLRelUtils;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+import org.beam.dsls.sql.transform.BeamSQLFilterFn;
+
+/**
+ * BeamRelNode to replace a {@code Filter} node.
+ *
+ */
+public class BeamFilterRel extends Filter implements BeamRelNode {
+
+ public BeamFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child,
+ RexNode condition) {
+ super(cluster, traits, child, condition);
+ }
+
+ @Override
+ public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+ return new BeamFilterRel(getCluster(), traitSet, input, condition);
+ }
+
+ @Override
+ public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception {
+
+ RelNode input = getInput();
+ BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
+
+ String stageName = BeamSQLRelUtils.getStageName(this);
+
+ PCollection<BeamSQLRow> upstream = planCreator.getLatestStream();
+
+ BeamSQLExpressionExecutor executor = new BeamSQLSpELExecutor(this);
+
+ PCollection<BeamSQLRow> projectStream = upstream.apply(stageName,
+ ParDo.of(new BeamSQLFilterFn(getRelTypeName(), executor)));
+
+ planCreator.setLatestStream(projectStream);
+
+ return planCreator.getPipeline();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSinkRel.java
new file mode 100644
index 0000000..46654e5
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSinkRel.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.rel;
+
+import com.google.common.base.Joiner;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rex.RexNode;
+import org.beam.dsls.sql.planner.BeamPipelineCreator;
+import org.beam.dsls.sql.planner.BeamSQLRelUtils;
+import org.beam.dsls.sql.schema.BaseBeamTable;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+
+/**
+ * BeamRelNode to replace a {@code TableModify} node.
+ *
+ */
+public class BeamIOSinkRel extends TableModify implements BeamRelNode {
+ public BeamIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table,
+ Prepare.CatalogReader catalogReader, RelNode child, Operation operation,
+ List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
+ super(cluster, traits, table, catalogReader, child, operation, updateColumnList,
+ sourceExpressionList, flattened);
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new BeamIOSinkRel(getCluster(), traitSet, getTable(), getCatalogReader(), sole(inputs),
+ getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
+ }
+
+ @Override
+ public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception {
+
+ RelNode input = getInput();
+ BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
+
+ String stageName = BeamSQLRelUtils.getStageName(this);
+
+ PCollection<BeamSQLRow> upstream = planCreator.getLatestStream();
+
+ String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
+
+ BaseBeamTable targetTable = planCreator.getSourceTables().get(sourceName);
+
+ upstream.apply(stageName, targetTable.buildIOWriter());
+
+ planCreator.setHasPersistent(true);
+
+ return planCreator.getPipeline();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSourceRel.java
new file mode 100644
index 0000000..f14db92
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSourceRel.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.rel;
+
+import com.google.common.base.Joiner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.TableScan;
+import org.beam.dsls.sql.planner.BeamPipelineCreator;
+import org.beam.dsls.sql.planner.BeamSQLRelUtils;
+import org.beam.dsls.sql.schema.BaseBeamTable;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+
+/**
+ * BeamRelNode to replace a {@code TableScan} node.
+ *
+ */
+public class BeamIOSourceRel extends TableScan implements BeamRelNode {
+
+ public BeamIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
+ super(cluster, traitSet, table);
+ }
+
+ @Override
+ public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception {
+
+ String sourceName = Joiner.on('.').join(getTable().getQualifiedName()).replace(".(STREAM)", "");
+
+ BaseBeamTable sourceTable = planCreator.getSourceTables().get(sourceName);
+
+ String stageName = BeamSQLRelUtils.getStageName(this);
+
+ PCollection<BeamSQLRow> sourceStream = planCreator.getPipeline().apply(stageName,
+ sourceTable.buildIOReader());
+
+ planCreator.setLatestStream(sourceStream);
+
+ return planCreator.getPipeline();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java
new file mode 100644
index 0000000..50fe8e0
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+
+/**
+ * Convertion for Beam SQL.
+ *
+ */
+public enum BeamLogicalConvention implements Convention {
+ INSTANCE;
+
+ @Override
+ public Class getInterface() {
+ return BeamRelNode.class;
+ }
+
+ @Override
+ public String getName() {
+ return "BEAM_LOGICAL";
+ }
+
+ @Override
+ public RelTraitDef getTraitDef() {
+ return ConventionTraitDef.INSTANCE;
+ }
+
+ @Override
+ public boolean satisfies(RelTrait trait) {
+ return this == trait;
+ }
+
+ @Override
+ public void register(RelOptPlanner planner) {
+ }
+
+ @Override
+ public String toString() {
+ return getName();
+ }
+
+ @Override
+ public boolean canConvertConvention(Convention toConvention) {
+ return false;
+ }
+
+ @Override
+ public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java
new file mode 100644
index 0000000..e41d74e
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.rel;
+
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
+import org.beam.dsls.sql.interpreter.BeamSQLSpELExecutor;
+import org.beam.dsls.sql.planner.BeamPipelineCreator;
+import org.beam.dsls.sql.planner.BeamSQLRelUtils;
+import org.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.beam.dsls.sql.schema.BeamSQLRow;
+import org.beam.dsls.sql.transform.BeamSQLProjectFn;
+
+/**
+ * BeamRelNode to replace a {@code Project} node.
+ *
+ */
+public class BeamProjectRel extends Project implements BeamRelNode {
+
+ /**
+ * projects: {@link RexLiteral}, {@link RexInputRef}, {@link RexCall}.
+ *
+ */
+ public BeamProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+ List<? extends RexNode> projects, RelDataType rowType) {
+ super(cluster, traits, input, projects, rowType);
+ }
+
+ @Override
+ public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects,
+ RelDataType rowType) {
+ return new BeamProjectRel(getCluster(), traitSet, input, projects, rowType);
+ }
+
+ @Override
+ public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception {
+ RelNode input = getInput();
+ BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
+
+ String stageName = BeamSQLRelUtils.getStageName(this);
+
+ PCollection<BeamSQLRow> upstream = planCreator.getLatestStream();
+
+ BeamSQLExpressionExecutor executor = new BeamSQLSpELExecutor(this);
+
+ PCollection<BeamSQLRow> projectStream = upstream.apply(stageName, ParDo
+ .of(new BeamSQLProjectFn(getRelTypeName(), executor, BeamSQLRecordType.from(rowType))));
+
+ planCreator.setLatestStream(projectStream);
+
+ return planCreator.getPipeline();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java
new file mode 100644
index 0000000..07ffee5
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.rel;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.calcite.rel.RelNode;
+import org.beam.dsls.sql.planner.BeamPipelineCreator;
+
+/**
+ * A new method {@link #buildBeamPipeline(BeamPipelineCreator)} is added, it's
+ * called by {@link BeamPipelineCreator}.
+ *
+ */
+public interface BeamRelNode extends RelNode {
+
+ /**
+ * A {@link BeamRelNode} is a recursive structure, the
+ * {@link BeamPipelineCreator} visits it with a DFS(Depth-First-Search)
+ * algorithm.
+ *
+ */
+ Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java
new file mode 100644
index 0000000..13dc962
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * BeamSQL specified nodes, to replace {@link org.apache.calcite.rel.RelNode}.
+ *
+ */
+package org.beam.dsls.sql.rel;
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java
new file mode 100644
index 0000000..2ad7c07
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.beam.dsls.sql.rel.BeamFilterRel;
+import org.beam.dsls.sql.rel.BeamLogicalConvention;
+
+/**
+ * A {@code ConverterRule} to replace {@link Filter} with {@link BeamFilterRel}.
+ *
+ */
+public class BeamFilterRule extends ConverterRule {
+ public static final BeamFilterRule INSTANCE = new BeamFilterRule();
+
+ private BeamFilterRule() {
+ super(LogicalFilter.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamFilterRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final Filter filter = (Filter) rel;
+ final RelNode input = filter.getInput();
+
+ return new BeamFilterRel(filter.getCluster(),
+ filter.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ filter.getCondition());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java
new file mode 100644
index 0000000..a44c002
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.rule;
+
+import java.util.List;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.Table;
+import org.beam.dsls.sql.rel.BeamIOSinkRel;
+import org.beam.dsls.sql.rel.BeamLogicalConvention;
+
+/**
+ * A {@code ConverterRule} to replace {@link TableModify} with
+ * {@link BeamIOSinkRel}.
+ *
+ */
+public class BeamIOSinkRule extends ConverterRule {
+ public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule();
+
+ private BeamIOSinkRule() {
+ super(LogicalTableModify.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
+ "BeamIOSinkRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final TableModify tableModify = (TableModify) rel;
+ final RelNode input = tableModify.getInput();
+
+ final RelOptCluster cluster = tableModify.getCluster();
+ final RelTraitSet traitSet = tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE);
+ final RelOptTable relOptTable = tableModify.getTable();
+ final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
+ final RelNode convertedInput = convert(input,
+ input.getTraitSet().replace(BeamLogicalConvention.INSTANCE));
+ final TableModify.Operation operation = tableModify.getOperation();
+ final List<String> updateColumnList = tableModify.getUpdateColumnList();
+ final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList();
+ final boolean flattened = tableModify.isFlattened();
+
+ final Table table = tableModify.getTable().unwrap(Table.class);
+
+ switch (table.getJdbcTableType()) {
+ case TABLE:
+ case STREAM:
+ if (operation != TableModify.Operation.INSERT) {
+ throw new UnsupportedOperationException(
+ String.format("Streams doesn't support %s modify operation", operation));
+ }
+ return new BeamIOSinkRel(cluster, traitSet,
+ relOptTable, catalogReader, convertedInput, operation, updateColumnList,
+ sourceExpressionList, flattened);
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported table type: %s", table.getJdbcTableType()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java
new file mode 100644
index 0000000..9e4778b
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.beam.dsls.sql.rel.BeamIOSourceRel;
+import org.beam.dsls.sql.rel.BeamLogicalConvention;
+
+/**
+ * A {@code ConverterRule} to replace {@link TableScan} with
+ * {@link BeamIOSourceRel}.
+ *
+ */
+public class BeamIOSourceRule extends ConverterRule {
+ public static final BeamIOSourceRule INSTANCE = new BeamIOSourceRule();
+
+ private BeamIOSourceRule() {
+ super(LogicalTableScan.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
+ "BeamIOSourceRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final TableScan scan = (TableScan) rel;
+
+ return new BeamIOSourceRel(scan.getCluster(),
+ scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE), scan.getTable());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java
new file mode 100644
index 0000000..117a056
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.beam.dsls.sql.rule;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.beam.dsls.sql.rel.BeamProjectRel;
+
+/**
+ * A {@code ConverterRule} to replace {@link Project} with
+ * {@link BeamProjectRel}.
+ *
+ */
+public class BeamProjectRule extends ConverterRule {
+ public static final BeamProjectRule INSTANCE = new BeamProjectRule();
+
+ private BeamProjectRule() {
+ super(LogicalProject.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamProjectRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ final Project project = (Project) rel;
+ final RelNode input = project.getInput();
+
+ return new BeamProjectRel(project.getCluster(),
+ project.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ project.getProjects(), project.getRowType());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7867ce62/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java
new file mode 100644
index 0000000..56ddcf3
--- /dev/null
+++ b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * {@link org.apache.calcite.plan.RelOptRule} to generate {@link org.beam.dsls.sql.rel.BeamRelNode}.
+ */
+package org.beam.dsls.sql.rule;