You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:29 UTC
[06/59] beam git commit: move dsls/sql to sdks/java/extensions/sql
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java
new file mode 100644
index 0000000..2066353
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.schema;
+
+import java.io.Serializable;
+
+/**
+ * Interface to create a UDF in Beam SQL.
+ *
+ * <p>A static method {@code eval} is required. Here is an example:
+ *
+ * <blockquote><pre>
+ * public static class MyLeftFunction {
+ * public String eval(
+ * @Parameter(name = "s") String s,
+ * @Parameter(name = "n", optional = true) Integer n) {
+ * return s.substring(0, n == null ? 1 : n);
+ * }
+ * }</pre></blockquote>
+ *
+ * <p>The first parameter is named "s" and is mandatory,
+ * and the second parameter is named "n" and is optional.
+ */
+public interface BeamSqlUdf extends Serializable {
+ String UDF_METHOD = "eval";
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
new file mode 100644
index 0000000..4b7e76b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.schema;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.math.BigDecimal;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.commons.csv.CSVRecord;
+
+/**
+ * Utility methods for working with {@code BeamTable}.
+ */
+public final class BeamTableUtils {
+ public static BeamSqlRow csvLine2BeamSqlRow(
+ CSVFormat csvFormat,
+ String line,
+ BeamSqlRowType beamSqlRowType) {
+ BeamSqlRow row = new BeamSqlRow(beamSqlRowType);
+ try (StringReader reader = new StringReader(line)) {
+ CSVParser parser = csvFormat.parse(reader);
+ CSVRecord rawRecord = parser.getRecords().get(0);
+
+ if (rawRecord.size() != beamSqlRowType.size()) {
+ throw new IllegalArgumentException(String.format(
+ "Expect %d fields, but actually %d",
+ beamSqlRowType.size(), rawRecord.size()
+ ));
+ } else {
+ for (int idx = 0; idx < beamSqlRowType.size(); idx++) {
+ String raw = rawRecord.get(idx);
+ addFieldWithAutoTypeCasting(row, idx, raw);
+ }
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException("decodeRecord failed!", e);
+ }
+ return row;
+ }
+
+ public static String beamSqlRow2CsvLine(BeamSqlRow row, CSVFormat csvFormat) {
+ StringWriter writer = new StringWriter();
+ try (CSVPrinter printer = csvFormat.print(writer)) {
+ for (int i = 0; i < row.size(); i++) {
+ printer.print(row.getFieldValue(i).toString());
+ }
+ printer.println();
+ } catch (IOException e) {
+ throw new IllegalArgumentException("encodeRecord failed!", e);
+ }
+ return writer.toString();
+ }
+
+ public static void addFieldWithAutoTypeCasting(BeamSqlRow row, int idx, Object rawObj) {
+ if (rawObj == null) {
+ row.addField(idx, null);
+ return;
+ }
+
+ SqlTypeName columnType = CalciteUtils.getFieldType(row.getDataType(), idx);
+ // auto-casting for numberics
+ if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType))
+ || (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) {
+ String raw = rawObj.toString();
+ switch (columnType) {
+ case TINYINT:
+ row.addField(idx, Byte.valueOf(raw));
+ break;
+ case SMALLINT:
+ row.addField(idx, Short.valueOf(raw));
+ break;
+ case INTEGER:
+ row.addField(idx, Integer.valueOf(raw));
+ break;
+ case BIGINT:
+ row.addField(idx, Long.valueOf(raw));
+ break;
+ case FLOAT:
+ row.addField(idx, Float.valueOf(raw));
+ break;
+ case DOUBLE:
+ row.addField(idx, Double.valueOf(raw));
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Column type %s is not supported yet!", columnType));
+ }
+ } else if (SqlTypeName.CHAR_TYPES.contains(columnType)) {
+ // convert NlsString to String
+ if (rawObj instanceof NlsString) {
+ row.addField(idx, ((NlsString) rawObj).getValue());
+ } else {
+ row.addField(idx, rawObj);
+ }
+ } else {
+ // keep the origin
+ row.addField(idx, rawObj);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
new file mode 100644
index 0000000..a18f3de
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.schema.kafka;
+
+import static org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSqlRow2CsvLine;
+import static org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSqlRow;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * A Kafka topic that saves records as CSV format.
+ *
+ */
+public class BeamKafkaCSVTable extends BeamKafkaTable {
+ private CSVFormat csvFormat;
+ public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
+ List<String> topics) {
+ this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT);
+ }
+
+ public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
+ List<String> topics, CSVFormat format) {
+ super(beamSqlRowType, bootstrapServers, topics);
+ this.csvFormat = format;
+ }
+
+ @Override
+ public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>>
+ getPTransformForInput() {
+ return new CsvRecorderDecoder(beamSqlRowType, csvFormat);
+ }
+
+ @Override
+ public PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>>
+ getPTransformForOutput() {
+ return new CsvRecorderEncoder(beamSqlRowType, csvFormat);
+ }
+
+ /**
+ * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamSqlRow}.
+ *
+ */
+ public static class CsvRecorderDecoder
+ extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>> {
+ private BeamSqlRowType rowType;
+ private CSVFormat format;
+ public CsvRecorderDecoder(BeamSqlRowType rowType, CSVFormat format) {
+ this.rowType = rowType;
+ this.format = format;
+ }
+
+ @Override
+ public PCollection<BeamSqlRow> expand(PCollection<KV<byte[], byte[]>> input) {
+ return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamSqlRow>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ String rowInString = new String(c.element().getValue());
+ c.output(csvLine2BeamSqlRow(format, rowInString, rowType));
+ }
+ }));
+ }
+ }
+
+ /**
+ * A PTransform to convert {@link BeamSqlRow} to {@code KV<byte[], byte[]>}.
+ *
+ */
+ public static class CsvRecorderEncoder
+ extends PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>> {
+ private BeamSqlRowType rowType;
+ private CSVFormat format;
+ public CsvRecorderEncoder(BeamSqlRowType rowType, CSVFormat format) {
+ this.rowType = rowType;
+ this.format = format;
+ }
+
+ @Override
+ public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamSqlRow> input) {
+ return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSqlRow, KV<byte[], byte[]>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ BeamSqlRow in = c.element();
+ c.output(KV.of(new byte[] {}, beamSqlRow2CsvLine(in, format).getBytes()));
+ }
+ }));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
new file mode 100644
index 0000000..faa2706
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.schema.kafka;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.BeamIOType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+/**
+ * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to
+ * extend to convert between {@code BeamSqlRow} and {@code KV<byte[], byte[]>}.
+ *
+ */
+public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable {
+
+ private String bootstrapServers;
+ private List<String> topics;
+ private Map<String, Object> configUpdates;
+
+ protected BeamKafkaTable(BeamSqlRowType beamSqlRowType) {
+ super(beamSqlRowType);
+ }
+
+ public BeamKafkaTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
+ List<String> topics) {
+ super(beamSqlRowType);
+ this.bootstrapServers = bootstrapServers;
+ this.topics = topics;
+ }
+
+ public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) {
+ this.configUpdates = configUpdates;
+ return this;
+ }
+
+ @Override
+ public BeamIOType getSourceType() {
+ return BeamIOType.UNBOUNDED;
+ }
+
+ public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>>
+ getPTransformForInput();
+
+ public abstract PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>>
+ getPTransformForOutput();
+
+ @Override
+ public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+ return PBegin.in(pipeline).apply("read",
+ KafkaIO.<byte[], byte[]>read()
+ .withBootstrapServers(bootstrapServers)
+ .withTopics(topics)
+ .updateConsumerProperties(configUpdates)
+ .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
+ .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
+ .withoutMetadata())
+ .apply("in_format", getPTransformForInput());
+ }
+
+ @Override
+ public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+ checkArgument(topics != null && topics.size() == 1,
+ "Only one topic can be acceptable as output.");
+
+ return new PTransform<PCollection<BeamSqlRow>, PDone>() {
+ @Override
+ public PDone expand(PCollection<BeamSqlRow> input) {
+ return input.apply("out_reformat", getPTransformForOutput()).apply("persistent",
+ KafkaIO.<byte[], byte[]>write()
+ .withBootstrapServers(bootstrapServers)
+ .withTopic(topics.get(0))
+ .withKeySerializer(ByteArraySerializer.class)
+ .withValueSerializer(ByteArraySerializer.class));
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/package-info.java
new file mode 100644
index 0000000..0418372
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * table schema for KafkaIO.
+ */
+package org.apache.beam.dsls.sql.schema.kafka;
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java
new file mode 100644
index 0000000..4c41826
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * define table schema, to map with Beam IO components.
+ *
+ */
+package org.apache.beam.dsls.sql.schema;
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
new file mode 100644
index 0000000..9ed56b4
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.schema.text;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.commons.csv.CSVFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@code BeamTextCSVTable} is a {@code BeamTextTable} which formatted in CSV.
+ *
+ * <p>
+ * {@link CSVFormat} itself has many dialects, check its javadoc for more info.
+ * </p>
+ */
+public class BeamTextCSVTable extends BeamTextTable {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(BeamTextCSVTable.class);
+
+ private CSVFormat csvFormat;
+
+ /**
+ * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format.
+ */
+ public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern) {
+ this(beamSqlRowType, filePattern, CSVFormat.DEFAULT);
+ }
+
+ public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern,
+ CSVFormat csvFormat) {
+ super(beamSqlRowType, filePattern);
+ this.csvFormat = csvFormat;
+ }
+
+ @Override
+ public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+ return PBegin.in(pipeline).apply("decodeRecord", TextIO.read().from(filePattern))
+ .apply("parseCSVLine",
+ new BeamTextCSVTableIOReader(beamSqlRowType, filePattern, csvFormat));
+ }
+
+ @Override
+ public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+ return new BeamTextCSVTableIOWriter(beamSqlRowType, filePattern, csvFormat);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
new file mode 100644
index 0000000..874c3e4
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.schema.text;
+
+import static org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSqlRow;
+
+import java.io.Serializable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * IOReader for {@code BeamTextCSVTable}.
+ */
+public class BeamTextCSVTableIOReader
+ extends PTransform<PCollection<String>, PCollection<BeamSqlRow>>
+ implements Serializable {
+ private String filePattern;
+ protected BeamSqlRowType beamSqlRowType;
+ protected CSVFormat csvFormat;
+
+ public BeamTextCSVTableIOReader(BeamSqlRowType beamSqlRowType, String filePattern,
+ CSVFormat csvFormat) {
+ this.filePattern = filePattern;
+ this.beamSqlRowType = beamSqlRowType;
+ this.csvFormat = csvFormat;
+ }
+
+ @Override
+ public PCollection<BeamSqlRow> expand(PCollection<String> input) {
+ return input.apply(ParDo.of(new DoFn<String, BeamSqlRow>() {
+ @ProcessElement
+ public void processElement(ProcessContext ctx) {
+ String str = ctx.element();
+ ctx.output(csvLine2BeamSqlRow(csvFormat, str, beamSqlRowType));
+ }
+ }));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
new file mode 100644
index 0000000..f61bb71
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.schema.text;
+
+import static org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSqlRow2CsvLine;
+
+import java.io.Serializable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * IOWriter for {@code BeamTextCSVTable}.
+ */
+public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamSqlRow>, PDone>
+ implements Serializable {
+ private String filePattern;
+ protected BeamSqlRowType beamSqlRowType;
+ protected CSVFormat csvFormat;
+
+ public BeamTextCSVTableIOWriter(BeamSqlRowType beamSqlRowType, String filePattern,
+ CSVFormat csvFormat) {
+ this.filePattern = filePattern;
+ this.beamSqlRowType = beamSqlRowType;
+ this.csvFormat = csvFormat;
+ }
+
+ @Override public PDone expand(PCollection<BeamSqlRow> input) {
+ return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSqlRow, String>() {
+
+ @ProcessElement public void processElement(ProcessContext ctx) {
+ BeamSqlRow row = ctx.element();
+ ctx.output(beamSqlRow2CsvLine(row, csvFormat));
+ }
+ })).apply(TextIO.write().to(filePattern));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
new file mode 100644
index 0000000..6dc6cd0
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.schema.text;
+
+import java.io.Serializable;
+
+import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.BeamIOType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+
+/**
+ * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}).
+ */
+public abstract class BeamTextTable extends BaseBeamTable implements Serializable {
+ protected String filePattern;
+
+ protected BeamTextTable(BeamSqlRowType beamSqlRowType, String filePattern) {
+ super(beamSqlRowType);
+ this.filePattern = filePattern;
+ }
+
+ @Override
+ public BeamIOType getSourceType() {
+ return BeamIOType.BOUNDED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java
new file mode 100644
index 0000000..f48f2fe
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Table schema for text files.
+ */
+package org.apache.beam.dsls.sql.schema.text;
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
new file mode 100644
index 0000000..5b21765
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.transform;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.schema.impl.AggregateFunctionImpl;
+import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.joda.time.Instant;
+
+/**
+ * Collections of {@code PTransform} and {@code DoFn} used to perform GROUP-BY operation.
+ */
+public class BeamAggregationTransforms implements Serializable{
+ /**
+ * Merge KV to single record.
+ */
+ public static class MergeAggregationRecord extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> {
+ private BeamSqlRowType outRowType;
+ private List<String> aggFieldNames;
+ private int windowStartFieldIdx;
+
+ public MergeAggregationRecord(BeamSqlRowType outRowType, List<AggregateCall> aggList
+ , int windowStartFieldIdx) {
+ this.outRowType = outRowType;
+ this.aggFieldNames = new ArrayList<>();
+ for (AggregateCall ac : aggList) {
+ aggFieldNames.add(ac.getName());
+ }
+ this.windowStartFieldIdx = windowStartFieldIdx;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) {
+ BeamSqlRow outRecord = new BeamSqlRow(outRowType);
+ outRecord.updateWindowRange(c.element().getKey(), window);
+
+ KV<BeamSqlRow, BeamSqlRow> kvRecord = c.element();
+ for (String f : kvRecord.getKey().getDataType().getFieldsName()) {
+ outRecord.addField(f, kvRecord.getKey().getFieldValue(f));
+ }
+ for (int idx = 0; idx < aggFieldNames.size(); ++idx) {
+ outRecord.addField(aggFieldNames.get(idx), kvRecord.getValue().getFieldValue(idx));
+ }
+ if (windowStartFieldIdx != -1) {
+ outRecord.addField(windowStartFieldIdx, outRecord.getWindowStart().toDate());
+ }
+
+ c.output(outRecord);
+ }
+ }
+
+ /**
+ * extract group-by fields.
+ */
+ public static class AggregationGroupByKeyFn
+ implements SerializableFunction<BeamSqlRow, BeamSqlRow> {
+ private List<Integer> groupByKeys;
+
+ public AggregationGroupByKeyFn(int windowFieldIdx, ImmutableBitSet groupSet) {
+ this.groupByKeys = new ArrayList<>();
+ for (int i : groupSet.asList()) {
+ if (i != windowFieldIdx) {
+ groupByKeys.add(i);
+ }
+ }
+ }
+
+ @Override
+ public BeamSqlRow apply(BeamSqlRow input) {
+ BeamSqlRowType typeOfKey = exTypeOfKeyRecord(input.getDataType());
+ BeamSqlRow keyOfRecord = new BeamSqlRow(typeOfKey);
+ keyOfRecord.updateWindowRange(input, null);
+
+ for (int idx = 0; idx < groupByKeys.size(); ++idx) {
+ keyOfRecord.addField(idx, input.getFieldValue(groupByKeys.get(idx)));
+ }
+ return keyOfRecord;
+ }
+
+ private BeamSqlRowType exTypeOfKeyRecord(BeamSqlRowType dataType) {
+ List<String> fieldNames = new ArrayList<>();
+ List<Integer> fieldTypes = new ArrayList<>();
+ for (int idx : groupByKeys) {
+ fieldNames.add(dataType.getFieldsName().get(idx));
+ fieldTypes.add(dataType.getFieldsType().get(idx));
+ }
+ return BeamSqlRowType.create(fieldNames, fieldTypes);
+ }
+ }
+
+ /**
+ * Assign event timestamp.
+ */
+ public static class WindowTimestampFn implements SerializableFunction<BeamSqlRow, Instant> {
+ private int windowFieldIdx = -1;
+
+ public WindowTimestampFn(int windowFieldIdx) {
+ super();
+ this.windowFieldIdx = windowFieldIdx;
+ }
+
+ @Override
+ public Instant apply(BeamSqlRow input) {
+ return new Instant(input.getDate(windowFieldIdx).getTime());
+ }
+ }
+
+ /**
+ * An adaptor class to invoke Calcite UDAF instances in Beam {@code CombineFn}.
+ */
+ public static class AggregationAdaptor
+ extends CombineFn<BeamSqlRow, AggregationAccumulator, BeamSqlRow> {
+ private List<BeamSqlUdaf> aggregators;
+ private List<BeamSqlExpression> sourceFieldExps;
+ private BeamSqlRowType finalRowType;
+
+ public AggregationAdaptor(List<AggregateCall> aggregationCalls,
+ BeamSqlRowType sourceRowType) {
+ aggregators = new ArrayList<>();
+ sourceFieldExps = new ArrayList<>();
+ List<String> outFieldsName = new ArrayList<>();
+ List<Integer> outFieldsType = new ArrayList<>();
+ for (AggregateCall call : aggregationCalls) {
+ int refIndex = call.getArgList().size() > 0 ? call.getArgList().get(0) : 0;
+ BeamSqlExpression sourceExp = new BeamSqlInputRefExpression(
+ CalciteUtils.getFieldType(sourceRowType, refIndex), refIndex);
+ sourceFieldExps.add(sourceExp);
+
+ outFieldsName.add(call.name);
+ int outFieldType = CalciteUtils.toJavaType(call.type.getSqlTypeName());
+ outFieldsType.add(outFieldType);
+
+ switch (call.getAggregation().getName()) {
+ case "COUNT":
+ aggregators.add(new BeamBuiltinAggregations.Count());
+ break;
+ case "MAX":
+ aggregators.add(BeamBuiltinAggregations.Max.create(call.type.getSqlTypeName()));
+ break;
+ case "MIN":
+ aggregators.add(BeamBuiltinAggregations.Min.create(call.type.getSqlTypeName()));
+ break;
+ case "SUM":
+ aggregators.add(BeamBuiltinAggregations.Sum.create(call.type.getSqlTypeName()));
+ break;
+ case "AVG":
+ aggregators.add(BeamBuiltinAggregations.Avg.create(call.type.getSqlTypeName()));
+ break;
+ default:
+ if (call.getAggregation() instanceof SqlUserDefinedAggFunction) {
+ // handle UDAF.
+ SqlUserDefinedAggFunction udaf = (SqlUserDefinedAggFunction) call.getAggregation();
+ AggregateFunctionImpl fn = (AggregateFunctionImpl) udaf.function;
+ try {
+ aggregators.add((BeamSqlUdaf) fn.declaringClass.newInstance());
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("Aggregator [%s] is not supported",
+ call.getAggregation().getName()));
+ }
+ break;
+ }
+ }
+ finalRowType = BeamSqlRowType.create(outFieldsName, outFieldsType);
+ }
+ @Override
+ public AggregationAccumulator createAccumulator() {
+ AggregationAccumulator initialAccu = new AggregationAccumulator();
+ for (BeamSqlUdaf agg : aggregators) {
+ initialAccu.accumulatorElements.add(agg.init());
+ }
+ return initialAccu;
+ }
+ @Override
+ public AggregationAccumulator addInput(AggregationAccumulator accumulator, BeamSqlRow input) {
+ AggregationAccumulator deltaAcc = new AggregationAccumulator();
+ for (int idx = 0; idx < aggregators.size(); ++idx) {
+ deltaAcc.accumulatorElements.add(
+ aggregators.get(idx).add(accumulator.accumulatorElements.get(idx),
+ sourceFieldExps.get(idx).evaluate(input).getValue()));
+ }
+ return deltaAcc;
+ }
+ @Override
+ public AggregationAccumulator mergeAccumulators(Iterable<AggregationAccumulator> accumulators) {
+ AggregationAccumulator deltaAcc = new AggregationAccumulator();
+ for (int idx = 0; idx < aggregators.size(); ++idx) {
+ List accs = new ArrayList<>();
+ Iterator<AggregationAccumulator> ite = accumulators.iterator();
+ while (ite.hasNext()) {
+ accs.add(ite.next().accumulatorElements.get(idx));
+ }
+ deltaAcc.accumulatorElements.add(aggregators.get(idx).merge(accs));
+ }
+ return deltaAcc;
+ }
+ @Override
+ public BeamSqlRow extractOutput(AggregationAccumulator accumulator) {
+ BeamSqlRow result = new BeamSqlRow(finalRowType);
+ for (int idx = 0; idx < aggregators.size(); ++idx) {
+ result.addField(idx, aggregators.get(idx).result(accumulator.accumulatorElements.get(idx)));
+ }
+ return result;
+ }
+ @Override
+ public Coder<AggregationAccumulator> getAccumulatorCoder(
+ CoderRegistry registry, Coder<BeamSqlRow> inputCoder)
+ throws CannotProvideCoderException {
+ registry.registerCoderForClass(BigDecimal.class, BigDecimalCoder.of());
+ List<Coder> aggAccuCoderList = new ArrayList<>();
+ for (BeamSqlUdaf udaf : aggregators) {
+ aggAccuCoderList.add(udaf.getAccumulatorCoder(registry));
+ }
+ return new AggregationAccumulatorCoder(aggAccuCoderList);
+ }
+ }
+
+ /**
+ * A class to holder varied accumulator objects.
+ */
+ public static class AggregationAccumulator{
+ private List accumulatorElements = new ArrayList<>();
+ }
+
+ /**
+ * Coder for {@link AggregationAccumulator}.
+ */
+ public static class AggregationAccumulatorCoder extends CustomCoder<AggregationAccumulator>{
+ private VarIntCoder sizeCoder = VarIntCoder.of();
+ private List<Coder> elementCoders;
+
+ public AggregationAccumulatorCoder(List<Coder> elementCoders) {
+ this.elementCoders = elementCoders;
+ }
+
+ @Override
+ public void encode(AggregationAccumulator value, OutputStream outStream)
+ throws CoderException, IOException {
+ sizeCoder.encode(value.accumulatorElements.size(), outStream);
+ for (int idx = 0; idx < value.accumulatorElements.size(); ++idx) {
+ elementCoders.get(idx).encode(value.accumulatorElements.get(idx), outStream);
+ }
+ }
+
+ @Override
+ public AggregationAccumulator decode(InputStream inStream) throws CoderException, IOException {
+ AggregationAccumulator accu = new AggregationAccumulator();
+ int size = sizeCoder.decode(inStream);
+ for (int idx = 0; idx < size; ++idx) {
+ accu.accumulatorElements.add(elementCoders.get(idx).decode(inStream));
+ }
+ return accu;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamBuiltinAggregations.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamBuiltinAggregations.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamBuiltinAggregations.java
new file mode 100644
index 0000000..fab2666
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamBuiltinAggregations.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.transform;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.Iterator;
+import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Built-in aggregations functions for COUNT/MAX/MIN/SUM/AVG.
+ */
+class BeamBuiltinAggregations {
+ /**
+ * Built-in aggregation for COUNT.
+ */
+ public static final class Count<T> extends BeamSqlUdaf<T, Long, Long> {
+ public Count() {}
+
+ @Override
+ public Long init() {
+ return 0L;
+ }
+
+ @Override
+ public Long add(Long accumulator, T input) {
+ return accumulator + 1;
+ }
+
+ @Override
+ public Long merge(Iterable<Long> accumulators) {
+ long v = 0L;
+ Iterator<Long> ite = accumulators.iterator();
+ while (ite.hasNext()) {
+ v += ite.next();
+ }
+ return v;
+ }
+
+ @Override
+ public Long result(Long accumulator) {
+ return accumulator;
+ }
+ }
+
+ /**
+ * Built-in aggregation for MAX.
+ */
+ public static final class Max<T extends Comparable<T>> extends BeamSqlUdaf<T, T, T> {
+ public static Max create(SqlTypeName fieldType) {
+ switch (fieldType) {
+ case INTEGER:
+ return new BeamBuiltinAggregations.Max<Integer>(fieldType);
+ case SMALLINT:
+ return new BeamBuiltinAggregations.Max<Short>(fieldType);
+ case TINYINT:
+ return new BeamBuiltinAggregations.Max<Byte>(fieldType);
+ case BIGINT:
+ return new BeamBuiltinAggregations.Max<Long>(fieldType);
+ case FLOAT:
+ return new BeamBuiltinAggregations.Max<Float>(fieldType);
+ case DOUBLE:
+ return new BeamBuiltinAggregations.Max<Double>(fieldType);
+ case TIMESTAMP:
+ return new BeamBuiltinAggregations.Max<Date>(fieldType);
+ case DECIMAL:
+ return new BeamBuiltinAggregations.Max<BigDecimal>(fieldType);
+ default:
+ throw new UnsupportedOperationException(
+ String.format("[%s] is not support in MAX", fieldType));
+ }
+ }
+
+ private final SqlTypeName fieldType;
+ private Max(SqlTypeName fieldType) {
+ this.fieldType = fieldType;
+ }
+
+ @Override
+ public T init() {
+ return null;
+ }
+
+ @Override
+ public T add(T accumulator, T input) {
+ return (accumulator == null || accumulator.compareTo(input) < 0) ? input : accumulator;
+ }
+
+ @Override
+ public T merge(Iterable<T> accumulators) {
+ Iterator<T> ite = accumulators.iterator();
+ T mergedV = ite.next();
+ while (ite.hasNext()) {
+ T v = ite.next();
+ mergedV = mergedV.compareTo(v) > 0 ? mergedV : v;
+ }
+ return mergedV;
+ }
+
+ @Override
+ public T result(T accumulator) {
+ return accumulator;
+ }
+
+ @Override
+ public Coder<T> getAccumulatorCoder(CoderRegistry registry) throws CannotProvideCoderException {
+ return BeamBuiltinAggregations.getSqlTypeCoder(fieldType);
+ }
+ }
+
+ /**
+ * Built-in aggregation for MIN.
+ */
+ public static final class Min<T extends Comparable<T>> extends BeamSqlUdaf<T, T, T> {
+ public static Min create(SqlTypeName fieldType) {
+ switch (fieldType) {
+ case INTEGER:
+ return new BeamBuiltinAggregations.Min<Integer>(fieldType);
+ case SMALLINT:
+ return new BeamBuiltinAggregations.Min<Short>(fieldType);
+ case TINYINT:
+ return new BeamBuiltinAggregations.Min<Byte>(fieldType);
+ case BIGINT:
+ return new BeamBuiltinAggregations.Min<Long>(fieldType);
+ case FLOAT:
+ return new BeamBuiltinAggregations.Min<Float>(fieldType);
+ case DOUBLE:
+ return new BeamBuiltinAggregations.Min<Double>(fieldType);
+ case TIMESTAMP:
+ return new BeamBuiltinAggregations.Min<Date>(fieldType);
+ case DECIMAL:
+ return new BeamBuiltinAggregations.Min<BigDecimal>(fieldType);
+ default:
+ throw new UnsupportedOperationException(
+ String.format("[%s] is not support in MIN", fieldType));
+ }
+ }
+
+ private final SqlTypeName fieldType;
+ private Min(SqlTypeName fieldType) {
+ this.fieldType = fieldType;
+ }
+
+ @Override
+ public T init() {
+ return null;
+ }
+
+ @Override
+ public T add(T accumulator, T input) {
+ return (accumulator == null || accumulator.compareTo(input) > 0) ? input : accumulator;
+ }
+
+ @Override
+ public T merge(Iterable<T> accumulators) {
+ Iterator<T> ite = accumulators.iterator();
+ T mergedV = ite.next();
+ while (ite.hasNext()) {
+ T v = ite.next();
+ mergedV = mergedV.compareTo(v) < 0 ? mergedV : v;
+ }
+ return mergedV;
+ }
+
+ @Override
+ public T result(T accumulator) {
+ return accumulator;
+ }
+
+ @Override
+ public Coder<T> getAccumulatorCoder(CoderRegistry registry) throws CannotProvideCoderException {
+ return BeamBuiltinAggregations.getSqlTypeCoder(fieldType);
+ }
+ }
+
+ /**
+ * Built-in aggregation for SUM.
+ */
+ public static final class Sum<T> extends BeamSqlUdaf<T, BigDecimal, T> {
+ public static Sum create(SqlTypeName fieldType) {
+ switch (fieldType) {
+ case INTEGER:
+ return new BeamBuiltinAggregations.Sum<Integer>(fieldType);
+ case SMALLINT:
+ return new BeamBuiltinAggregations.Sum<Short>(fieldType);
+ case TINYINT:
+ return new BeamBuiltinAggregations.Sum<Byte>(fieldType);
+ case BIGINT:
+ return new BeamBuiltinAggregations.Sum<Long>(fieldType);
+ case FLOAT:
+ return new BeamBuiltinAggregations.Sum<Float>(fieldType);
+ case DOUBLE:
+ return new BeamBuiltinAggregations.Sum<Double>(fieldType);
+ case TIMESTAMP:
+ return new BeamBuiltinAggregations.Sum<Date>(fieldType);
+ case DECIMAL:
+ return new BeamBuiltinAggregations.Sum<BigDecimal>(fieldType);
+ default:
+ throw new UnsupportedOperationException(
+ String.format("[%s] is not support in SUM", fieldType));
+ }
+ }
+
+ private SqlTypeName fieldType;
+ private Sum(SqlTypeName fieldType) {
+ this.fieldType = fieldType;
+ }
+
+ @Override
+ public BigDecimal init() {
+ return new BigDecimal(0);
+ }
+
+ @Override
+ public BigDecimal add(BigDecimal accumulator, T input) {
+ return accumulator.add(new BigDecimal(input.toString()));
+ }
+
+ @Override
+ public BigDecimal merge(Iterable<BigDecimal> accumulators) {
+ BigDecimal v = new BigDecimal(0);
+ Iterator<BigDecimal> ite = accumulators.iterator();
+ while (ite.hasNext()) {
+ v = v.add(ite.next());
+ }
+ return v;
+ }
+
+ @Override
+ public T result(BigDecimal accumulator) {
+ Object result = null;
+ switch (fieldType) {
+ case INTEGER:
+ result = accumulator.intValue();
+ break;
+ case BIGINT:
+ result = accumulator.longValue();
+ break;
+ case SMALLINT:
+ result = accumulator.shortValue();
+ break;
+ case TINYINT:
+ result = accumulator.byteValue();
+ break;
+ case DOUBLE:
+ result = accumulator.doubleValue();
+ break;
+ case FLOAT:
+ result = accumulator.floatValue();
+ break;
+ case DECIMAL:
+ result = accumulator;
+ break;
+ default:
+ break;
+ }
+ return (T) result;
+ }
+ }
+
+ /**
+ * Built-in aggregation for AVG.
+ */
+ public static final class Avg<T> extends BeamSqlUdaf<T, KV<BigDecimal, Long>, T> {
+ public static Avg create(SqlTypeName fieldType) {
+ switch (fieldType) {
+ case INTEGER:
+ return new BeamBuiltinAggregations.Avg<Integer>(fieldType);
+ case SMALLINT:
+ return new BeamBuiltinAggregations.Avg<Short>(fieldType);
+ case TINYINT:
+ return new BeamBuiltinAggregations.Avg<Byte>(fieldType);
+ case BIGINT:
+ return new BeamBuiltinAggregations.Avg<Long>(fieldType);
+ case FLOAT:
+ return new BeamBuiltinAggregations.Avg<Float>(fieldType);
+ case DOUBLE:
+ return new BeamBuiltinAggregations.Avg<Double>(fieldType);
+ case TIMESTAMP:
+ return new BeamBuiltinAggregations.Avg<Date>(fieldType);
+ case DECIMAL:
+ return new BeamBuiltinAggregations.Avg<BigDecimal>(fieldType);
+ default:
+ throw new UnsupportedOperationException(
+ String.format("[%s] is not support in AVG", fieldType));
+ }
+ }
+
+ private SqlTypeName fieldType;
+ private Avg(SqlTypeName fieldType) {
+ this.fieldType = fieldType;
+ }
+
+ @Override
+ public KV<BigDecimal, Long> init() {
+ return KV.of(new BigDecimal(0), 0L);
+ }
+
+ @Override
+ public KV<BigDecimal, Long> add(KV<BigDecimal, Long> accumulator, T input) {
+ return KV.of(
+ accumulator.getKey().add(new BigDecimal(input.toString())),
+ accumulator.getValue() + 1);
+ }
+
+ @Override
+ public KV<BigDecimal, Long> merge(Iterable<KV<BigDecimal, Long>> accumulators) {
+ BigDecimal v = new BigDecimal(0);
+ long s = 0;
+ Iterator<KV<BigDecimal, Long>> ite = accumulators.iterator();
+ while (ite.hasNext()) {
+ KV<BigDecimal, Long> r = ite.next();
+ v = v.add(r.getKey());
+ s += r.getValue();
+ }
+ return KV.of(v, s);
+ }
+
+ @Override
+ public T result(KV<BigDecimal, Long> accumulator) {
+ BigDecimal decimalAvg = accumulator.getKey().divide(
+ new BigDecimal(accumulator.getValue()));
+ Object result = null;
+ switch (fieldType) {
+ case INTEGER:
+ result = decimalAvg.intValue();
+ break;
+ case BIGINT:
+ result = decimalAvg.longValue();
+ break;
+ case SMALLINT:
+ result = decimalAvg.shortValue();
+ break;
+ case TINYINT:
+ result = decimalAvg.byteValue();
+ break;
+ case DOUBLE:
+ result = decimalAvg.doubleValue();
+ break;
+ case FLOAT:
+ result = decimalAvg.floatValue();
+ break;
+ case DECIMAL:
+ result = decimalAvg;
+ break;
+ default:
+ break;
+ }
+ return (T) result;
+ }
+
+ @Override
+ public Coder<KV<BigDecimal, Long>> getAccumulatorCoder(CoderRegistry registry)
+ throws CannotProvideCoderException {
+ return KvCoder.of(BigDecimalCoder.of(), VarLongCoder.of());
+ }
+ }
+
+ /**
+ * Find {@link Coder} for Beam SQL field types.
+ */
+ private static Coder getSqlTypeCoder(SqlTypeName sqlType) {
+ switch (sqlType) {
+ case INTEGER:
+ return VarIntCoder.of();
+ case SMALLINT:
+ return SerializableCoder.of(Short.class);
+ case TINYINT:
+ return ByteCoder.of();
+ case BIGINT:
+ return VarLongCoder.of();
+ case FLOAT:
+ return SerializableCoder.of(Float.class);
+ case DOUBLE:
+ return DoubleCoder.of();
+ case TIMESTAMP:
+ return SerializableCoder.of(Date.class);
+ case DECIMAL:
+ return BigDecimalCoder.of();
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Cannot find a Coder for data type [%s]", sqlType));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
new file mode 100644
index 0000000..9ea4376
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.transform;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.util.Pair;
+
+/**
+ * Collections of {@code PTransform} and {@code DoFn} used to perform JOIN operation.
+ */
+public class BeamJoinTransforms {
+
+ /**
+ * A {@code SimpleFunction} to extract join fields from the specified row.
+ */
+ public static class ExtractJoinFields
+ extends SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> {
+ private final boolean isLeft;
+ private final List<Pair<Integer, Integer>> joinColumns;
+
+ public ExtractJoinFields(boolean isLeft, List<Pair<Integer, Integer>> joinColumns) {
+ this.isLeft = isLeft;
+ this.joinColumns = joinColumns;
+ }
+
+ @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) {
+ // build the type
+ // the name of the join field is not important
+ List<String> names = new ArrayList<>(joinColumns.size());
+ List<Integer> types = new ArrayList<>(joinColumns.size());
+ for (int i = 0; i < joinColumns.size(); i++) {
+ names.add("c" + i);
+ types.add(isLeft
+ ? input.getDataType().getFieldsType().get(joinColumns.get(i).getKey()) :
+ input.getDataType().getFieldsType().get(joinColumns.get(i).getValue()));
+ }
+ BeamSqlRowType type = BeamSqlRowType.create(names, types);
+
+ // build the row
+ BeamSqlRow row = new BeamSqlRow(type);
+ for (int i = 0; i < joinColumns.size(); i++) {
+ row.addField(i, input
+ .getFieldValue(isLeft ? joinColumns.get(i).getKey() : joinColumns.get(i).getValue()));
+ }
+ return KV.of(row, input);
+ }
+ }
+
+
+ /**
+ * A {@code DoFn} which implement the sideInput-JOIN.
+ */
+ public static class SideInputJoinDoFn extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> {
+ private final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView;
+ private final JoinRelType joinType;
+ private final BeamSqlRow rightNullRow;
+ private final boolean swap;
+
+ public SideInputJoinDoFn(JoinRelType joinType, BeamSqlRow rightNullRow,
+ PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView,
+ boolean swap) {
+ this.joinType = joinType;
+ this.rightNullRow = rightNullRow;
+ this.sideInputView = sideInputView;
+ this.swap = swap;
+ }
+
+ @ProcessElement public void processElement(ProcessContext context) {
+ BeamSqlRow key = context.element().getKey();
+ BeamSqlRow leftRow = context.element().getValue();
+ Map<BeamSqlRow, Iterable<BeamSqlRow>> key2Rows = context.sideInput(sideInputView);
+ Iterable<BeamSqlRow> rightRowsIterable = key2Rows.get(key);
+
+ if (rightRowsIterable != null && rightRowsIterable.iterator().hasNext()) {
+ Iterator<BeamSqlRow> it = rightRowsIterable.iterator();
+ while (it.hasNext()) {
+ context.output(combineTwoRowsIntoOne(leftRow, it.next(), swap));
+ }
+ } else {
+ if (joinType == JoinRelType.LEFT) {
+ context.output(combineTwoRowsIntoOne(leftRow, rightNullRow, swap));
+ }
+ }
+ }
+ }
+
+
+ /**
+ * A {@code SimpleFunction} to combine two rows into one.
+ */
+ public static class JoinParts2WholeRow
+ extends SimpleFunction<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>, BeamSqlRow> {
+ @Override public BeamSqlRow apply(KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> input) {
+ KV<BeamSqlRow, BeamSqlRow> parts = input.getValue();
+ BeamSqlRow leftRow = parts.getKey();
+ BeamSqlRow rightRow = parts.getValue();
+ return combineTwoRowsIntoOne(leftRow, rightRow, false);
+ }
+ }
+
+ /**
+ * As the method name suggests: combine two rows into one wide row.
+ */
+ private static BeamSqlRow combineTwoRowsIntoOne(BeamSqlRow leftRow,
+ BeamSqlRow rightRow, boolean swap) {
+ if (swap) {
+ return combineTwoRowsIntoOneHelper(rightRow, leftRow);
+ } else {
+ return combineTwoRowsIntoOneHelper(leftRow, rightRow);
+ }
+ }
+
+ /**
+ * As the method name suggests: combine two rows into one wide row.
+ */
+ private static BeamSqlRow combineTwoRowsIntoOneHelper(BeamSqlRow leftRow,
+ BeamSqlRow rightRow) {
+ // build the type
+ List<String> names = new ArrayList<>(leftRow.size() + rightRow.size());
+ names.addAll(leftRow.getDataType().getFieldsName());
+ names.addAll(rightRow.getDataType().getFieldsName());
+
+ List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size());
+ types.addAll(leftRow.getDataType().getFieldsType());
+ types.addAll(rightRow.getDataType().getFieldsType());
+ BeamSqlRowType type = BeamSqlRowType.create(names, types);
+
+ BeamSqlRow row = new BeamSqlRow(type);
+ // build the row
+ for (int i = 0; i < leftRow.size(); i++) {
+ row.addField(i, leftRow.getFieldValue(i));
+ }
+
+ for (int i = 0; i < rightRow.size(); i++) {
+ row.addField(i + leftRow.size(), rightRow.getFieldValue(i));
+ }
+
+ return row;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java
new file mode 100644
index 0000000..a983cf5
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.transform;
+
+import java.util.Iterator;
+
+import org.apache.beam.dsls.sql.rel.BeamSetOperatorRelBase;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Collections of {@code PTransform} and {@code DoFn} used to perform Set operations.
+ */
+public abstract class BeamSetOperatorsTransforms {
+ /**
+ * Transform a {@code BeamSqlRow} to a {@code KV<BeamSqlRow, BeamSqlRow>}.
+ */
+ public static class BeamSqlRow2KvFn extends
+ SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> {
+ @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) {
+ return KV.of(input, input);
+ }
+ }
+
+ /**
+ * Filter function used for Set operators.
+ */
+ public static class SetOperatorFilteringDoFn extends
+ DoFn<KV<BeamSqlRow, CoGbkResult>, BeamSqlRow> {
+ private TupleTag<BeamSqlRow> leftTag;
+ private TupleTag<BeamSqlRow> rightTag;
+ private BeamSetOperatorRelBase.OpType opType;
+ // ALL?
+ private boolean all;
+
+ public SetOperatorFilteringDoFn(TupleTag<BeamSqlRow> leftTag, TupleTag<BeamSqlRow> rightTag,
+ BeamSetOperatorRelBase.OpType opType, boolean all) {
+ this.leftTag = leftTag;
+ this.rightTag = rightTag;
+ this.opType = opType;
+ this.all = all;
+ }
+
+ @ProcessElement public void processElement(ProcessContext ctx) {
+ CoGbkResult coGbkResult = ctx.element().getValue();
+ Iterable<BeamSqlRow> leftRows = coGbkResult.getAll(leftTag);
+ Iterable<BeamSqlRow> rightRows = coGbkResult.getAll(rightTag);
+ switch (opType) {
+ case UNION:
+ if (all) {
+ // output both left & right
+ Iterator<BeamSqlRow> iter = leftRows.iterator();
+ while (iter.hasNext()) {
+ ctx.output(iter.next());
+ }
+ iter = rightRows.iterator();
+ while (iter.hasNext()) {
+ ctx.output(iter.next());
+ }
+ } else {
+ // only output the key
+ ctx.output(ctx.element().getKey());
+ }
+ break;
+ case INTERSECT:
+ if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) {
+ if (all) {
+ for (BeamSqlRow leftRow : leftRows) {
+ ctx.output(leftRow);
+ }
+ } else {
+ ctx.output(ctx.element().getKey());
+ }
+ }
+ break;
+ case MINUS:
+ if (leftRows.iterator().hasNext() && !rightRows.iterator().hasNext()) {
+ Iterator<BeamSqlRow> iter = leftRows.iterator();
+ if (all) {
+ // output all
+ while (iter.hasNext()) {
+ ctx.output(iter.next());
+ }
+ } else {
+ // only output one
+ ctx.output(iter.next());
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlFilterFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlFilterFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlFilterFn.java
new file mode 100644
index 0000000..d4dbc6a
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlFilterFn.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.transform;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
+import org.apache.beam.dsls.sql.rel.BeamFilterRel;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * {@code BeamSqlFilterFn} is the executor for a {@link BeamFilterRel} step.
+ *
+ */
+public class BeamSqlFilterFn extends DoFn<BeamSqlRow, BeamSqlRow> {
+
+ private String stepName;
+ private BeamSqlExpressionExecutor executor;
+
+ public BeamSqlFilterFn(String stepName, BeamSqlExpressionExecutor executor) {
+ super();
+ this.stepName = stepName;
+ this.executor = executor;
+ }
+
+ @Setup
+ public void setup() {
+ executor.prepare();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ BeamSqlRow in = c.element();
+
+ List<Object> result = executor.execute(in);
+
+ if ((Boolean) result.get(0)) {
+ c.output(in);
+ }
+ }
+
+ @Teardown
+ public void close() {
+ executor.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java
new file mode 100644
index 0000000..d8a2a63
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.transform;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * A test PTransform to display output in console.
+ *
+ */
+public class BeamSqlOutputToConsoleFn extends DoFn<BeamSqlRow, Void> {
+
+ private String stepName;
+
+ public BeamSqlOutputToConsoleFn(String stepName) {
+ super();
+ this.stepName = stepName;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ System.out.println("Output: " + c.element().getDataValues());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
new file mode 100644
index 0000000..886ddcf
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.transform;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
+import org.apache.beam.dsls.sql.rel.BeamProjectRel;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.beam.dsls.sql.schema.BeamTableUtils;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+
+/**
+ *
+ * {@code BeamSqlProjectFn} is the executor for a {@link BeamProjectRel} step.
+ *
+ */
+public class BeamSqlProjectFn extends DoFn<BeamSqlRow, BeamSqlRow> {
+ private String stepName;
+ private BeamSqlExpressionExecutor executor;
+ private BeamSqlRowType outputRowType;
+
+ public BeamSqlProjectFn(String stepName, BeamSqlExpressionExecutor executor,
+ BeamSqlRowType outputRowType) {
+ super();
+ this.stepName = stepName;
+ this.executor = executor;
+ this.outputRowType = outputRowType;
+ }
+
+ @Setup
+ public void setup() {
+ executor.prepare();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) {
+ BeamSqlRow inputRow = c.element();
+ List<Object> results = executor.execute(inputRow);
+
+ BeamSqlRow outRow = new BeamSqlRow(outputRowType);
+ outRow.updateWindowRange(inputRow, window);
+
+ for (int idx = 0; idx < results.size(); ++idx) {
+ BeamTableUtils.addFieldWithAutoTypeCasting(outRow, idx, results.get(idx));
+ }
+
+ c.output(outRow);
+ }
+
+ @Teardown
+ public void close() {
+ executor.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/package-info.java
new file mode 100644
index 0000000..5169749
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/transform/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * {@link org.apache.beam.sdk.transforms.PTransform} used in a BeamSql pipeline.
+ */
+package org.apache.beam.dsls.sql.transform;
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
new file mode 100644
index 0000000..4b8696b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.utils;
+
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Utility methods for Calcite related operations.
+ */
+public class CalciteUtils {
+ private static final Map<Integer, SqlTypeName> JAVA_TO_CALCITE_MAPPING = new HashMap<>();
+ private static final Map<SqlTypeName, Integer> CALCITE_TO_JAVA_MAPPING = new HashMap<>();
+ static {
+ JAVA_TO_CALCITE_MAPPING.put(Types.TINYINT, SqlTypeName.TINYINT);
+ JAVA_TO_CALCITE_MAPPING.put(Types.SMALLINT, SqlTypeName.SMALLINT);
+ JAVA_TO_CALCITE_MAPPING.put(Types.INTEGER, SqlTypeName.INTEGER);
+ JAVA_TO_CALCITE_MAPPING.put(Types.BIGINT, SqlTypeName.BIGINT);
+
+ JAVA_TO_CALCITE_MAPPING.put(Types.FLOAT, SqlTypeName.FLOAT);
+ JAVA_TO_CALCITE_MAPPING.put(Types.DOUBLE, SqlTypeName.DOUBLE);
+
+ JAVA_TO_CALCITE_MAPPING.put(Types.DECIMAL, SqlTypeName.DECIMAL);
+
+ JAVA_TO_CALCITE_MAPPING.put(Types.CHAR, SqlTypeName.CHAR);
+ JAVA_TO_CALCITE_MAPPING.put(Types.VARCHAR, SqlTypeName.VARCHAR);
+
+ JAVA_TO_CALCITE_MAPPING.put(Types.DATE, SqlTypeName.DATE);
+ JAVA_TO_CALCITE_MAPPING.put(Types.TIME, SqlTypeName.TIME);
+ JAVA_TO_CALCITE_MAPPING.put(Types.TIMESTAMP, SqlTypeName.TIMESTAMP);
+
+ JAVA_TO_CALCITE_MAPPING.put(Types.BOOLEAN, SqlTypeName.BOOLEAN);
+
+ for (Map.Entry<Integer, SqlTypeName> pair : JAVA_TO_CALCITE_MAPPING.entrySet()) {
+ CALCITE_TO_JAVA_MAPPING.put(pair.getValue(), pair.getKey());
+ }
+ }
+
+ /**
+ * Get the corresponding {@code SqlTypeName} for an integer sql type.
+ */
+ public static SqlTypeName toCalciteType(int type) {
+ return JAVA_TO_CALCITE_MAPPING.get(type);
+ }
+
+ /**
+ * Get the integer sql type from Calcite {@code SqlTypeName}.
+ */
+ public static Integer toJavaType(SqlTypeName typeName) {
+ return CALCITE_TO_JAVA_MAPPING.get(typeName);
+ }
+
+ /**
+ * Get the {@code SqlTypeName} for the specified column of a table.
+ */
+ public static SqlTypeName getFieldType(BeamSqlRowType schema, int index) {
+ return toCalciteType(schema.getFieldsType().get(index));
+ }
+
+ /**
+ * Generate {@code BeamSqlRowType} from {@code RelDataType} which is used to create table.
+ */
+ public static BeamSqlRowType toBeamRowType(RelDataType tableInfo) {
+ List<String> fieldNames = new ArrayList<>();
+ List<Integer> fieldTypes = new ArrayList<>();
+ for (RelDataTypeField f : tableInfo.getFieldList()) {
+ fieldNames.add(f.getName());
+ fieldTypes.add(toJavaType(f.getType().getSqlTypeName()));
+ }
+ return BeamSqlRowType.create(fieldNames, fieldTypes);
+ }
+
+ /**
+ * Create an instance of {@code RelDataType} so it can be used to create a table.
+ */
+ public static RelProtoDataType toCalciteRowType(final BeamSqlRowType that) {
+ return new RelProtoDataType() {
+ @Override
+ public RelDataType apply(RelDataTypeFactory a) {
+ RelDataTypeFactory.FieldInfoBuilder builder = a.builder();
+ for (int idx = 0; idx < that.getFieldsName().size(); ++idx) {
+ builder.add(that.getFieldsName().get(idx), toCalciteType(that.getFieldsType().get(idx)));
+ }
+ return builder.build();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/utils/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/utils/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/utils/package-info.java
new file mode 100644
index 0000000..b5c861a
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/utils/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utility classes.
+ */
+package org.apache.beam.dsls.sql.utils;
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/resources/log4j.properties b/sdks/java/extensions/sql/src/main/resources/log4j.properties
new file mode 100644
index 0000000..709484b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/resources/log4j.properties
@@ -0,0 +1,23 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=ERROR,console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
\ No newline at end of file