You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:39 UTC
[16/59] beam git commit: move dsls/sql to sdks/java/extensions/sql
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
deleted file mode 100644
index d419473..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.schema;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-/**
- * This interface defines a Beam Sql Table.
- */
-public interface BeamSqlTable {
- /**
- * In Beam SQL, there's no difference between a batch query and a streaming
- * query. {@link BeamIOType} is used to validate the sources.
- */
- BeamIOType getSourceType();
-
- /**
- * create a {@code PCollection<BeamSqlRow>} from source.
- *
- */
- PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline);
-
- /**
- * create a {@code IO.write()} instance to write to target.
- *
- */
- PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter();
-
- /**
- * Get the schema info of the table.
- */
- BeamSqlRowType getRowType();
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdaf.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdaf.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdaf.java
deleted file mode 100644
index 9582ffa..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdaf.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import java.io.Serializable;
-import java.lang.reflect.ParameterizedType;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * abstract class of aggregation functions in Beam SQL.
- *
- * <p>There're several constrains for a UDAF:<br>
- * 1. A constructor with an empty argument list is required;<br>
- * 2. The type of {@code InputT} and {@code OutputT} can only be Interger/Long/Short/Byte/Double
- * /Float/Date/BigDecimal, mapping as SQL type INTEGER/BIGINT/SMALLINT/TINYINE/DOUBLE/FLOAT
- * /TIMESTAMP/DECIMAL;<br>
- * 3. Keep intermediate data in {@code AccumT}, and do not rely on elements in class;<br>
- */
-public abstract class BeamSqlUdaf<InputT, AccumT, OutputT> implements Serializable {
- public BeamSqlUdaf(){}
-
- /**
- * create an initial aggregation object, equals to {@link CombineFn#createAccumulator()}.
- */
- public abstract AccumT init();
-
- /**
- * add an input value, equals to {@link CombineFn#addInput(Object, Object)}.
- */
- public abstract AccumT add(AccumT accumulator, InputT input);
-
- /**
- * merge aggregation objects from parallel tasks, equals to
- * {@link CombineFn#mergeAccumulators(Iterable)}.
- */
- public abstract AccumT merge(Iterable<AccumT> accumulators);
-
- /**
- * extract output value from aggregation object, equals to
- * {@link CombineFn#extractOutput(Object)}.
- */
- public abstract OutputT result(AccumT accumulator);
-
- /**
- * get the coder for AccumT which stores the intermediate result.
- * By default it's fetched from {@link CoderRegistry}.
- */
- public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry)
- throws CannotProvideCoderException {
- return registry.getCoder(
- (Class<AccumT>) ((ParameterizedType) getClass()
- .getGenericSuperclass()).getActualTypeArguments()[1]);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java
deleted file mode 100644
index 2066353..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlUdf.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import java.io.Serializable;
-
-/**
- * Interface to create a UDF in Beam SQL.
- *
- * <p>A static method {@code eval} is required. Here is an example:
- *
- * <blockquote><pre>
- * public static class MyLeftFunction {
- * public String eval(
- * @Parameter(name = "s") String s,
- * @Parameter(name = "n", optional = true) Integer n) {
- * return s.substring(0, n == null ? 1 : n);
- * }
- * }</pre></blockquote>
- *
- * <p>The first parameter is named "s" and is mandatory,
- * and the second parameter is named "n" and is optional.
- */
-public interface BeamSqlUdf extends Serializable {
- String UDF_METHOD = "eval";
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
deleted file mode 100644
index 4b7e76b..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.schema;
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.math.BigDecimal;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.util.NlsString;
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVParser;
-import org.apache.commons.csv.CSVPrinter;
-import org.apache.commons.csv.CSVRecord;
-
-/**
- * Utility methods for working with {@code BeamTable}.
- */
-public final class BeamTableUtils {
- public static BeamSqlRow csvLine2BeamSqlRow(
- CSVFormat csvFormat,
- String line,
- BeamSqlRowType beamSqlRowType) {
- BeamSqlRow row = new BeamSqlRow(beamSqlRowType);
- try (StringReader reader = new StringReader(line)) {
- CSVParser parser = csvFormat.parse(reader);
- CSVRecord rawRecord = parser.getRecords().get(0);
-
- if (rawRecord.size() != beamSqlRowType.size()) {
- throw new IllegalArgumentException(String.format(
- "Expect %d fields, but actually %d",
- beamSqlRowType.size(), rawRecord.size()
- ));
- } else {
- for (int idx = 0; idx < beamSqlRowType.size(); idx++) {
- String raw = rawRecord.get(idx);
- addFieldWithAutoTypeCasting(row, idx, raw);
- }
- }
- } catch (IOException e) {
- throw new IllegalArgumentException("decodeRecord failed!", e);
- }
- return row;
- }
-
- public static String beamSqlRow2CsvLine(BeamSqlRow row, CSVFormat csvFormat) {
- StringWriter writer = new StringWriter();
- try (CSVPrinter printer = csvFormat.print(writer)) {
- for (int i = 0; i < row.size(); i++) {
- printer.print(row.getFieldValue(i).toString());
- }
- printer.println();
- } catch (IOException e) {
- throw new IllegalArgumentException("encodeRecord failed!", e);
- }
- return writer.toString();
- }
-
- public static void addFieldWithAutoTypeCasting(BeamSqlRow row, int idx, Object rawObj) {
- if (rawObj == null) {
- row.addField(idx, null);
- return;
- }
-
- SqlTypeName columnType = CalciteUtils.getFieldType(row.getDataType(), idx);
- // auto-casting for numberics
- if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType))
- || (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) {
- String raw = rawObj.toString();
- switch (columnType) {
- case TINYINT:
- row.addField(idx, Byte.valueOf(raw));
- break;
- case SMALLINT:
- row.addField(idx, Short.valueOf(raw));
- break;
- case INTEGER:
- row.addField(idx, Integer.valueOf(raw));
- break;
- case BIGINT:
- row.addField(idx, Long.valueOf(raw));
- break;
- case FLOAT:
- row.addField(idx, Float.valueOf(raw));
- break;
- case DOUBLE:
- row.addField(idx, Double.valueOf(raw));
- break;
- default:
- throw new UnsupportedOperationException(
- String.format("Column type %s is not supported yet!", columnType));
- }
- } else if (SqlTypeName.CHAR_TYPES.contains(columnType)) {
- // convert NlsString to String
- if (rawObj instanceof NlsString) {
- row.addField(idx, ((NlsString) rawObj).getValue());
- } else {
- row.addField(idx, rawObj);
- }
- } else {
- // keep the origin
- row.addField(idx, rawObj);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
deleted file mode 100644
index a18f3de..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema.kafka;
-
-import static org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSqlRow2CsvLine;
-import static org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSqlRow;
-
-import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.commons.csv.CSVFormat;
-
-/**
- * A Kafka topic that saves records as CSV format.
- *
- */
-public class BeamKafkaCSVTable extends BeamKafkaTable {
- private CSVFormat csvFormat;
- public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
- List<String> topics) {
- this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT);
- }
-
- public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
- List<String> topics, CSVFormat format) {
- super(beamSqlRowType, bootstrapServers, topics);
- this.csvFormat = format;
- }
-
- @Override
- public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>>
- getPTransformForInput() {
- return new CsvRecorderDecoder(beamSqlRowType, csvFormat);
- }
-
- @Override
- public PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>>
- getPTransformForOutput() {
- return new CsvRecorderEncoder(beamSqlRowType, csvFormat);
- }
-
- /**
- * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamSqlRow}.
- *
- */
- public static class CsvRecorderDecoder
- extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>> {
- private BeamSqlRowType rowType;
- private CSVFormat format;
- public CsvRecorderDecoder(BeamSqlRowType rowType, CSVFormat format) {
- this.rowType = rowType;
- this.format = format;
- }
-
- @Override
- public PCollection<BeamSqlRow> expand(PCollection<KV<byte[], byte[]>> input) {
- return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamSqlRow>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- String rowInString = new String(c.element().getValue());
- c.output(csvLine2BeamSqlRow(format, rowInString, rowType));
- }
- }));
- }
- }
-
- /**
- * A PTransform to convert {@link BeamSqlRow} to {@code KV<byte[], byte[]>}.
- *
- */
- public static class CsvRecorderEncoder
- extends PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>> {
- private BeamSqlRowType rowType;
- private CSVFormat format;
- public CsvRecorderEncoder(BeamSqlRowType rowType, CSVFormat format) {
- this.rowType = rowType;
- this.format = format;
- }
-
- @Override
- public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamSqlRow> input) {
- return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSqlRow, KV<byte[], byte[]>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- BeamSqlRow in = c.element();
- c.output(KV.of(new byte[] {}, beamSqlRow2CsvLine(in, format).getBytes()));
- }
- }));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
deleted file mode 100644
index faa2706..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema.kafka;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.io.kafka.KafkaIO;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-
-/**
- * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to
- * extend to convert between {@code BeamSqlRow} and {@code KV<byte[], byte[]>}.
- *
- */
-public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable {
-
- private String bootstrapServers;
- private List<String> topics;
- private Map<String, Object> configUpdates;
-
- protected BeamKafkaTable(BeamSqlRowType beamSqlRowType) {
- super(beamSqlRowType);
- }
-
- public BeamKafkaTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
- List<String> topics) {
- super(beamSqlRowType);
- this.bootstrapServers = bootstrapServers;
- this.topics = topics;
- }
-
- public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) {
- this.configUpdates = configUpdates;
- return this;
- }
-
- @Override
- public BeamIOType getSourceType() {
- return BeamIOType.UNBOUNDED;
- }
-
- public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>>
- getPTransformForInput();
-
- public abstract PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>>
- getPTransformForOutput();
-
- @Override
- public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
- return PBegin.in(pipeline).apply("read",
- KafkaIO.<byte[], byte[]>read()
- .withBootstrapServers(bootstrapServers)
- .withTopics(topics)
- .updateConsumerProperties(configUpdates)
- .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
- .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
- .withoutMetadata())
- .apply("in_format", getPTransformForInput());
- }
-
- @Override
- public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
- checkArgument(topics != null && topics.size() == 1,
- "Only one topic can be acceptable as output.");
-
- return new PTransform<PCollection<BeamSqlRow>, PDone>() {
- @Override
- public PDone expand(PCollection<BeamSqlRow> input) {
- return input.apply("out_reformat", getPTransformForOutput()).apply("persistent",
- KafkaIO.<byte[], byte[]>write()
- .withBootstrapServers(bootstrapServers)
- .withTopic(topics.get(0))
- .withKeySerializer(ByteArraySerializer.class)
- .withValueSerializer(ByteArraySerializer.class));
- }
- };
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/package-info.java
deleted file mode 100644
index 0418372..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * table schema for KafkaIO.
- */
-package org.apache.beam.dsls.sql.schema.kafka;
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java
deleted file mode 100644
index 4c41826..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * define table schema, to map with Beam IO components.
- *
- */
-package org.apache.beam.dsls.sql.schema;
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
deleted file mode 100644
index 9ed56b4..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.schema.text;
-
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.commons.csv.CSVFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@code BeamTextCSVTable} is a {@code BeamTextTable} which formatted in CSV.
- *
- * <p>
- * {@link CSVFormat} itself has many dialects, check its javadoc for more info.
- * </p>
- */
-public class BeamTextCSVTable extends BeamTextTable {
- private static final Logger LOG = LoggerFactory
- .getLogger(BeamTextCSVTable.class);
-
- private CSVFormat csvFormat;
-
- /**
- * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format.
- */
- public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern) {
- this(beamSqlRowType, filePattern, CSVFormat.DEFAULT);
- }
-
- public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern,
- CSVFormat csvFormat) {
- super(beamSqlRowType, filePattern);
- this.csvFormat = csvFormat;
- }
-
- @Override
- public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
- return PBegin.in(pipeline).apply("decodeRecord", TextIO.read().from(filePattern))
- .apply("parseCSVLine",
- new BeamTextCSVTableIOReader(beamSqlRowType, filePattern, csvFormat));
- }
-
- @Override
- public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
- return new BeamTextCSVTableIOWriter(beamSqlRowType, filePattern, csvFormat);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
deleted file mode 100644
index 874c3e4..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.schema.text;
-
-import static org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSqlRow;
-
-import java.io.Serializable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.commons.csv.CSVFormat;
-
-/**
- * IOReader for {@code BeamTextCSVTable}.
- */
-public class BeamTextCSVTableIOReader
- extends PTransform<PCollection<String>, PCollection<BeamSqlRow>>
- implements Serializable {
- private String filePattern;
- protected BeamSqlRowType beamSqlRowType;
- protected CSVFormat csvFormat;
-
- public BeamTextCSVTableIOReader(BeamSqlRowType beamSqlRowType, String filePattern,
- CSVFormat csvFormat) {
- this.filePattern = filePattern;
- this.beamSqlRowType = beamSqlRowType;
- this.csvFormat = csvFormat;
- }
-
- @Override
- public PCollection<BeamSqlRow> expand(PCollection<String> input) {
- return input.apply(ParDo.of(new DoFn<String, BeamSqlRow>() {
- @ProcessElement
- public void processElement(ProcessContext ctx) {
- String str = ctx.element();
- ctx.output(csvLine2BeamSqlRow(csvFormat, str, beamSqlRowType));
- }
- }));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
deleted file mode 100644
index f61bb71..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.schema.text;
-
-import static org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSqlRow2CsvLine;
-
-import java.io.Serializable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.commons.csv.CSVFormat;
-
-/**
- * IOWriter for {@code BeamTextCSVTable}.
- */
-public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamSqlRow>, PDone>
- implements Serializable {
- private String filePattern;
- protected BeamSqlRowType beamSqlRowType;
- protected CSVFormat csvFormat;
-
- public BeamTextCSVTableIOWriter(BeamSqlRowType beamSqlRowType, String filePattern,
- CSVFormat csvFormat) {
- this.filePattern = filePattern;
- this.beamSqlRowType = beamSqlRowType;
- this.csvFormat = csvFormat;
- }
-
- @Override public PDone expand(PCollection<BeamSqlRow> input) {
- return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSqlRow, String>() {
-
- @ProcessElement public void processElement(ProcessContext ctx) {
- BeamSqlRow row = ctx.element();
- ctx.output(beamSqlRow2CsvLine(row, csvFormat));
- }
- })).apply(TextIO.write().to(filePattern));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
deleted file mode 100644
index 6dc6cd0..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.schema.text;
-
-import java.io.Serializable;
-
-import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-
-/**
- * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}).
- */
-public abstract class BeamTextTable extends BaseBeamTable implements Serializable {
- protected String filePattern;
-
- protected BeamTextTable(BeamSqlRowType beamSqlRowType, String filePattern) {
- super(beamSqlRowType);
- this.filePattern = filePattern;
- }
-
- @Override
- public BeamIOType getSourceType() {
- return BeamIOType.BOUNDED;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java
deleted file mode 100644
index f48f2fe..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Table schema for text files.
- */
-package org.apache.beam.dsls.sql.schema.text;
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
deleted file mode 100644
index 5b21765..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.transform;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.coders.BigDecimalCoder;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.KV;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.schema.impl.AggregateFunctionImpl;
-import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.joda.time.Instant;
-
-/**
- * Collections of {@code PTransform} and {@code DoFn} used to perform GROUP-BY operation.
- */
-public class BeamAggregationTransforms implements Serializable{
- /**
- * Merge KV to single record.
- */
- public static class MergeAggregationRecord extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> {
- private BeamSqlRowType outRowType;
- private List<String> aggFieldNames;
- private int windowStartFieldIdx;
-
- public MergeAggregationRecord(BeamSqlRowType outRowType, List<AggregateCall> aggList
- , int windowStartFieldIdx) {
- this.outRowType = outRowType;
- this.aggFieldNames = new ArrayList<>();
- for (AggregateCall ac : aggList) {
- aggFieldNames.add(ac.getName());
- }
- this.windowStartFieldIdx = windowStartFieldIdx;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window) {
- BeamSqlRow outRecord = new BeamSqlRow(outRowType);
- outRecord.updateWindowRange(c.element().getKey(), window);
-
- KV<BeamSqlRow, BeamSqlRow> kvRecord = c.element();
- for (String f : kvRecord.getKey().getDataType().getFieldsName()) {
- outRecord.addField(f, kvRecord.getKey().getFieldValue(f));
- }
- for (int idx = 0; idx < aggFieldNames.size(); ++idx) {
- outRecord.addField(aggFieldNames.get(idx), kvRecord.getValue().getFieldValue(idx));
- }
- if (windowStartFieldIdx != -1) {
- outRecord.addField(windowStartFieldIdx, outRecord.getWindowStart().toDate());
- }
-
- c.output(outRecord);
- }
- }
-
- /**
- * extract group-by fields.
- */
- public static class AggregationGroupByKeyFn
- implements SerializableFunction<BeamSqlRow, BeamSqlRow> {
- private List<Integer> groupByKeys;
-
- public AggregationGroupByKeyFn(int windowFieldIdx, ImmutableBitSet groupSet) {
- this.groupByKeys = new ArrayList<>();
- for (int i : groupSet.asList()) {
- if (i != windowFieldIdx) {
- groupByKeys.add(i);
- }
- }
- }
-
- @Override
- public BeamSqlRow apply(BeamSqlRow input) {
- BeamSqlRowType typeOfKey = exTypeOfKeyRecord(input.getDataType());
- BeamSqlRow keyOfRecord = new BeamSqlRow(typeOfKey);
- keyOfRecord.updateWindowRange(input, null);
-
- for (int idx = 0; idx < groupByKeys.size(); ++idx) {
- keyOfRecord.addField(idx, input.getFieldValue(groupByKeys.get(idx)));
- }
- return keyOfRecord;
- }
-
- private BeamSqlRowType exTypeOfKeyRecord(BeamSqlRowType dataType) {
- List<String> fieldNames = new ArrayList<>();
- List<Integer> fieldTypes = new ArrayList<>();
- for (int idx : groupByKeys) {
- fieldNames.add(dataType.getFieldsName().get(idx));
- fieldTypes.add(dataType.getFieldsType().get(idx));
- }
- return BeamSqlRowType.create(fieldNames, fieldTypes);
- }
- }
-
- /**
- * Assign event timestamp.
- */
- public static class WindowTimestampFn implements SerializableFunction<BeamSqlRow, Instant> {
- private int windowFieldIdx = -1;
-
- public WindowTimestampFn(int windowFieldIdx) {
- super();
- this.windowFieldIdx = windowFieldIdx;
- }
-
- @Override
- public Instant apply(BeamSqlRow input) {
- return new Instant(input.getDate(windowFieldIdx).getTime());
- }
- }
-
- /**
- * An adaptor class to invoke Calcite UDAF instances in Beam {@code CombineFn}.
- */
- public static class AggregationAdaptor
- extends CombineFn<BeamSqlRow, AggregationAccumulator, BeamSqlRow> {
- private List<BeamSqlUdaf> aggregators;
- private List<BeamSqlExpression> sourceFieldExps;
- private BeamSqlRowType finalRowType;
-
- public AggregationAdaptor(List<AggregateCall> aggregationCalls,
- BeamSqlRowType sourceRowType) {
- aggregators = new ArrayList<>();
- sourceFieldExps = new ArrayList<>();
- List<String> outFieldsName = new ArrayList<>();
- List<Integer> outFieldsType = new ArrayList<>();
- for (AggregateCall call : aggregationCalls) {
- int refIndex = call.getArgList().size() > 0 ? call.getArgList().get(0) : 0;
- BeamSqlExpression sourceExp = new BeamSqlInputRefExpression(
- CalciteUtils.getFieldType(sourceRowType, refIndex), refIndex);
- sourceFieldExps.add(sourceExp);
-
- outFieldsName.add(call.name);
- int outFieldType = CalciteUtils.toJavaType(call.type.getSqlTypeName());
- outFieldsType.add(outFieldType);
-
- switch (call.getAggregation().getName()) {
- case "COUNT":
- aggregators.add(new BeamBuiltinAggregations.Count());
- break;
- case "MAX":
- aggregators.add(BeamBuiltinAggregations.Max.create(call.type.getSqlTypeName()));
- break;
- case "MIN":
- aggregators.add(BeamBuiltinAggregations.Min.create(call.type.getSqlTypeName()));
- break;
- case "SUM":
- aggregators.add(BeamBuiltinAggregations.Sum.create(call.type.getSqlTypeName()));
- break;
- case "AVG":
- aggregators.add(BeamBuiltinAggregations.Avg.create(call.type.getSqlTypeName()));
- break;
- default:
- if (call.getAggregation() instanceof SqlUserDefinedAggFunction) {
- // handle UDAF.
- SqlUserDefinedAggFunction udaf = (SqlUserDefinedAggFunction) call.getAggregation();
- AggregateFunctionImpl fn = (AggregateFunctionImpl) udaf.function;
- try {
- aggregators.add((BeamSqlUdaf) fn.declaringClass.newInstance());
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- } else {
- throw new UnsupportedOperationException(
- String.format("Aggregator [%s] is not supported",
- call.getAggregation().getName()));
- }
- break;
- }
- }
- finalRowType = BeamSqlRowType.create(outFieldsName, outFieldsType);
- }
- @Override
- public AggregationAccumulator createAccumulator() {
- AggregationAccumulator initialAccu = new AggregationAccumulator();
- for (BeamSqlUdaf agg : aggregators) {
- initialAccu.accumulatorElements.add(agg.init());
- }
- return initialAccu;
- }
- @Override
- public AggregationAccumulator addInput(AggregationAccumulator accumulator, BeamSqlRow input) {
- AggregationAccumulator deltaAcc = new AggregationAccumulator();
- for (int idx = 0; idx < aggregators.size(); ++idx) {
- deltaAcc.accumulatorElements.add(
- aggregators.get(idx).add(accumulator.accumulatorElements.get(idx),
- sourceFieldExps.get(idx).evaluate(input).getValue()));
- }
- return deltaAcc;
- }
- @Override
- public AggregationAccumulator mergeAccumulators(Iterable<AggregationAccumulator> accumulators) {
- AggregationAccumulator deltaAcc = new AggregationAccumulator();
- for (int idx = 0; idx < aggregators.size(); ++idx) {
- List accs = new ArrayList<>();
- Iterator<AggregationAccumulator> ite = accumulators.iterator();
- while (ite.hasNext()) {
- accs.add(ite.next().accumulatorElements.get(idx));
- }
- deltaAcc.accumulatorElements.add(aggregators.get(idx).merge(accs));
- }
- return deltaAcc;
- }
- @Override
- public BeamSqlRow extractOutput(AggregationAccumulator accumulator) {
- BeamSqlRow result = new BeamSqlRow(finalRowType);
- for (int idx = 0; idx < aggregators.size(); ++idx) {
- result.addField(idx, aggregators.get(idx).result(accumulator.accumulatorElements.get(idx)));
- }
- return result;
- }
- @Override
- public Coder<AggregationAccumulator> getAccumulatorCoder(
- CoderRegistry registry, Coder<BeamSqlRow> inputCoder)
- throws CannotProvideCoderException {
- registry.registerCoderForClass(BigDecimal.class, BigDecimalCoder.of());
- List<Coder> aggAccuCoderList = new ArrayList<>();
- for (BeamSqlUdaf udaf : aggregators) {
- aggAccuCoderList.add(udaf.getAccumulatorCoder(registry));
- }
- return new AggregationAccumulatorCoder(aggAccuCoderList);
- }
- }
-
- /**
- * A class to holder varied accumulator objects.
- */
- public static class AggregationAccumulator{
- private List accumulatorElements = new ArrayList<>();
- }
-
- /**
- * Coder for {@link AggregationAccumulator}.
- */
- public static class AggregationAccumulatorCoder extends CustomCoder<AggregationAccumulator>{
- private VarIntCoder sizeCoder = VarIntCoder.of();
- private List<Coder> elementCoders;
-
- public AggregationAccumulatorCoder(List<Coder> elementCoders) {
- this.elementCoders = elementCoders;
- }
-
- @Override
- public void encode(AggregationAccumulator value, OutputStream outStream)
- throws CoderException, IOException {
- sizeCoder.encode(value.accumulatorElements.size(), outStream);
- for (int idx = 0; idx < value.accumulatorElements.size(); ++idx) {
- elementCoders.get(idx).encode(value.accumulatorElements.get(idx), outStream);
- }
- }
-
- @Override
- public AggregationAccumulator decode(InputStream inStream) throws CoderException, IOException {
- AggregationAccumulator accu = new AggregationAccumulator();
- int size = sizeCoder.decode(inStream);
- for (int idx = 0; idx < size; ++idx) {
- accu.accumulatorElements.add(elementCoders.get(idx).decode(inStream));
- }
- return accu;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamBuiltinAggregations.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamBuiltinAggregations.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamBuiltinAggregations.java
deleted file mode 100644
index fab2666..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamBuiltinAggregations.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.transform;
-
-import java.math.BigDecimal;
-import java.util.Date;
-import java.util.Iterator;
-import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
-import org.apache.beam.sdk.coders.BigDecimalCoder;
-import org.apache.beam.sdk.coders.ByteCoder;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.DoubleCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.values.KV;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Built-in aggregations functions for COUNT/MAX/MIN/SUM/AVG.
- */
-class BeamBuiltinAggregations {
- /**
- * Built-in aggregation for COUNT.
- */
- public static final class Count<T> extends BeamSqlUdaf<T, Long, Long> {
- public Count() {}
-
- @Override
- public Long init() {
- return 0L;
- }
-
- @Override
- public Long add(Long accumulator, T input) {
- return accumulator + 1;
- }
-
- @Override
- public Long merge(Iterable<Long> accumulators) {
- long v = 0L;
- Iterator<Long> ite = accumulators.iterator();
- while (ite.hasNext()) {
- v += ite.next();
- }
- return v;
- }
-
- @Override
- public Long result(Long accumulator) {
- return accumulator;
- }
- }
-
- /**
- * Built-in aggregation for MAX.
- */
- public static final class Max<T extends Comparable<T>> extends BeamSqlUdaf<T, T, T> {
- public static Max create(SqlTypeName fieldType) {
- switch (fieldType) {
- case INTEGER:
- return new BeamBuiltinAggregations.Max<Integer>(fieldType);
- case SMALLINT:
- return new BeamBuiltinAggregations.Max<Short>(fieldType);
- case TINYINT:
- return new BeamBuiltinAggregations.Max<Byte>(fieldType);
- case BIGINT:
- return new BeamBuiltinAggregations.Max<Long>(fieldType);
- case FLOAT:
- return new BeamBuiltinAggregations.Max<Float>(fieldType);
- case DOUBLE:
- return new BeamBuiltinAggregations.Max<Double>(fieldType);
- case TIMESTAMP:
- return new BeamBuiltinAggregations.Max<Date>(fieldType);
- case DECIMAL:
- return new BeamBuiltinAggregations.Max<BigDecimal>(fieldType);
- default:
- throw new UnsupportedOperationException(
- String.format("[%s] is not support in MAX", fieldType));
- }
- }
-
- private final SqlTypeName fieldType;
- private Max(SqlTypeName fieldType) {
- this.fieldType = fieldType;
- }
-
- @Override
- public T init() {
- return null;
- }
-
- @Override
- public T add(T accumulator, T input) {
- return (accumulator == null || accumulator.compareTo(input) < 0) ? input : accumulator;
- }
-
- @Override
- public T merge(Iterable<T> accumulators) {
- Iterator<T> ite = accumulators.iterator();
- T mergedV = ite.next();
- while (ite.hasNext()) {
- T v = ite.next();
- mergedV = mergedV.compareTo(v) > 0 ? mergedV : v;
- }
- return mergedV;
- }
-
- @Override
- public T result(T accumulator) {
- return accumulator;
- }
-
- @Override
- public Coder<T> getAccumulatorCoder(CoderRegistry registry) throws CannotProvideCoderException {
- return BeamBuiltinAggregations.getSqlTypeCoder(fieldType);
- }
- }
-
- /**
- * Built-in aggregation for MIN.
- */
- public static final class Min<T extends Comparable<T>> extends BeamSqlUdaf<T, T, T> {
- public static Min create(SqlTypeName fieldType) {
- switch (fieldType) {
- case INTEGER:
- return new BeamBuiltinAggregations.Min<Integer>(fieldType);
- case SMALLINT:
- return new BeamBuiltinAggregations.Min<Short>(fieldType);
- case TINYINT:
- return new BeamBuiltinAggregations.Min<Byte>(fieldType);
- case BIGINT:
- return new BeamBuiltinAggregations.Min<Long>(fieldType);
- case FLOAT:
- return new BeamBuiltinAggregations.Min<Float>(fieldType);
- case DOUBLE:
- return new BeamBuiltinAggregations.Min<Double>(fieldType);
- case TIMESTAMP:
- return new BeamBuiltinAggregations.Min<Date>(fieldType);
- case DECIMAL:
- return new BeamBuiltinAggregations.Min<BigDecimal>(fieldType);
- default:
- throw new UnsupportedOperationException(
- String.format("[%s] is not support in MIN", fieldType));
- }
- }
-
- private final SqlTypeName fieldType;
- private Min(SqlTypeName fieldType) {
- this.fieldType = fieldType;
- }
-
- @Override
- public T init() {
- return null;
- }
-
- @Override
- public T add(T accumulator, T input) {
- return (accumulator == null || accumulator.compareTo(input) > 0) ? input : accumulator;
- }
-
- @Override
- public T merge(Iterable<T> accumulators) {
- Iterator<T> ite = accumulators.iterator();
- T mergedV = ite.next();
- while (ite.hasNext()) {
- T v = ite.next();
- mergedV = mergedV.compareTo(v) < 0 ? mergedV : v;
- }
- return mergedV;
- }
-
- @Override
- public T result(T accumulator) {
- return accumulator;
- }
-
- @Override
- public Coder<T> getAccumulatorCoder(CoderRegistry registry) throws CannotProvideCoderException {
- return BeamBuiltinAggregations.getSqlTypeCoder(fieldType);
- }
- }
-
- /**
- * Built-in aggregation for SUM.
- */
- public static final class Sum<T> extends BeamSqlUdaf<T, BigDecimal, T> {
- public static Sum create(SqlTypeName fieldType) {
- switch (fieldType) {
- case INTEGER:
- return new BeamBuiltinAggregations.Sum<Integer>(fieldType);
- case SMALLINT:
- return new BeamBuiltinAggregations.Sum<Short>(fieldType);
- case TINYINT:
- return new BeamBuiltinAggregations.Sum<Byte>(fieldType);
- case BIGINT:
- return new BeamBuiltinAggregations.Sum<Long>(fieldType);
- case FLOAT:
- return new BeamBuiltinAggregations.Sum<Float>(fieldType);
- case DOUBLE:
- return new BeamBuiltinAggregations.Sum<Double>(fieldType);
- case TIMESTAMP:
- return new BeamBuiltinAggregations.Sum<Date>(fieldType);
- case DECIMAL:
- return new BeamBuiltinAggregations.Sum<BigDecimal>(fieldType);
- default:
- throw new UnsupportedOperationException(
- String.format("[%s] is not support in SUM", fieldType));
- }
- }
-
- private SqlTypeName fieldType;
- private Sum(SqlTypeName fieldType) {
- this.fieldType = fieldType;
- }
-
- @Override
- public BigDecimal init() {
- return new BigDecimal(0);
- }
-
- @Override
- public BigDecimal add(BigDecimal accumulator, T input) {
- return accumulator.add(new BigDecimal(input.toString()));
- }
-
- @Override
- public BigDecimal merge(Iterable<BigDecimal> accumulators) {
- BigDecimal v = new BigDecimal(0);
- Iterator<BigDecimal> ite = accumulators.iterator();
- while (ite.hasNext()) {
- v = v.add(ite.next());
- }
- return v;
- }
-
- @Override
- public T result(BigDecimal accumulator) {
- Object result = null;
- switch (fieldType) {
- case INTEGER:
- result = accumulator.intValue();
- break;
- case BIGINT:
- result = accumulator.longValue();
- break;
- case SMALLINT:
- result = accumulator.shortValue();
- break;
- case TINYINT:
- result = accumulator.byteValue();
- break;
- case DOUBLE:
- result = accumulator.doubleValue();
- break;
- case FLOAT:
- result = accumulator.floatValue();
- break;
- case DECIMAL:
- result = accumulator;
- break;
- default:
- break;
- }
- return (T) result;
- }
- }
-
- /**
- * Built-in aggregation for AVG.
- */
- public static final class Avg<T> extends BeamSqlUdaf<T, KV<BigDecimal, Long>, T> {
- public static Avg create(SqlTypeName fieldType) {
- switch (fieldType) {
- case INTEGER:
- return new BeamBuiltinAggregations.Avg<Integer>(fieldType);
- case SMALLINT:
- return new BeamBuiltinAggregations.Avg<Short>(fieldType);
- case TINYINT:
- return new BeamBuiltinAggregations.Avg<Byte>(fieldType);
- case BIGINT:
- return new BeamBuiltinAggregations.Avg<Long>(fieldType);
- case FLOAT:
- return new BeamBuiltinAggregations.Avg<Float>(fieldType);
- case DOUBLE:
- return new BeamBuiltinAggregations.Avg<Double>(fieldType);
- case TIMESTAMP:
- return new BeamBuiltinAggregations.Avg<Date>(fieldType);
- case DECIMAL:
- return new BeamBuiltinAggregations.Avg<BigDecimal>(fieldType);
- default:
- throw new UnsupportedOperationException(
- String.format("[%s] is not support in AVG", fieldType));
- }
- }
-
- private SqlTypeName fieldType;
- private Avg(SqlTypeName fieldType) {
- this.fieldType = fieldType;
- }
-
- @Override
- public KV<BigDecimal, Long> init() {
- return KV.of(new BigDecimal(0), 0L);
- }
-
- @Override
- public KV<BigDecimal, Long> add(KV<BigDecimal, Long> accumulator, T input) {
- return KV.of(
- accumulator.getKey().add(new BigDecimal(input.toString())),
- accumulator.getValue() + 1);
- }
-
- @Override
- public KV<BigDecimal, Long> merge(Iterable<KV<BigDecimal, Long>> accumulators) {
- BigDecimal v = new BigDecimal(0);
- long s = 0;
- Iterator<KV<BigDecimal, Long>> ite = accumulators.iterator();
- while (ite.hasNext()) {
- KV<BigDecimal, Long> r = ite.next();
- v = v.add(r.getKey());
- s += r.getValue();
- }
- return KV.of(v, s);
- }
-
- @Override
- public T result(KV<BigDecimal, Long> accumulator) {
- BigDecimal decimalAvg = accumulator.getKey().divide(
- new BigDecimal(accumulator.getValue()));
- Object result = null;
- switch (fieldType) {
- case INTEGER:
- result = decimalAvg.intValue();
- break;
- case BIGINT:
- result = decimalAvg.longValue();
- break;
- case SMALLINT:
- result = decimalAvg.shortValue();
- break;
- case TINYINT:
- result = decimalAvg.byteValue();
- break;
- case DOUBLE:
- result = decimalAvg.doubleValue();
- break;
- case FLOAT:
- result = decimalAvg.floatValue();
- break;
- case DECIMAL:
- result = decimalAvg;
- break;
- default:
- break;
- }
- return (T) result;
- }
-
- @Override
- public Coder<KV<BigDecimal, Long>> getAccumulatorCoder(CoderRegistry registry)
- throws CannotProvideCoderException {
- return KvCoder.of(BigDecimalCoder.of(), VarLongCoder.of());
- }
- }
-
- /**
- * Find {@link Coder} for Beam SQL field types.
- */
- private static Coder getSqlTypeCoder(SqlTypeName sqlType) {
- switch (sqlType) {
- case INTEGER:
- return VarIntCoder.of();
- case SMALLINT:
- return SerializableCoder.of(Short.class);
- case TINYINT:
- return ByteCoder.of();
- case BIGINT:
- return VarLongCoder.of();
- case FLOAT:
- return SerializableCoder.of(Float.class);
- case DOUBLE:
- return DoubleCoder.of();
- case TIMESTAMP:
- return SerializableCoder.of(Date.class);
- case DECIMAL:
- return BigDecimalCoder.of();
- default:
- throw new UnsupportedOperationException(
- String.format("Cannot find a Coder for data type [%s]", sqlType));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
deleted file mode 100644
index 9ea4376..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.transform;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.util.Pair;
-
-/**
- * Collections of {@code PTransform} and {@code DoFn} used to perform JOIN operation.
- */
-public class BeamJoinTransforms {
-
- /**
- * A {@code SimpleFunction} to extract join fields from the specified row.
- */
- public static class ExtractJoinFields
- extends SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> {
- private final boolean isLeft;
- private final List<Pair<Integer, Integer>> joinColumns;
-
- public ExtractJoinFields(boolean isLeft, List<Pair<Integer, Integer>> joinColumns) {
- this.isLeft = isLeft;
- this.joinColumns = joinColumns;
- }
-
- @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) {
- // build the type
- // the name of the join field is not important
- List<String> names = new ArrayList<>(joinColumns.size());
- List<Integer> types = new ArrayList<>(joinColumns.size());
- for (int i = 0; i < joinColumns.size(); i++) {
- names.add("c" + i);
- types.add(isLeft
- ? input.getDataType().getFieldsType().get(joinColumns.get(i).getKey()) :
- input.getDataType().getFieldsType().get(joinColumns.get(i).getValue()));
- }
- BeamSqlRowType type = BeamSqlRowType.create(names, types);
-
- // build the row
- BeamSqlRow row = new BeamSqlRow(type);
- for (int i = 0; i < joinColumns.size(); i++) {
- row.addField(i, input
- .getFieldValue(isLeft ? joinColumns.get(i).getKey() : joinColumns.get(i).getValue()));
- }
- return KV.of(row, input);
- }
- }
-
-
- /**
- * A {@code DoFn} which implement the sideInput-JOIN.
- */
- public static class SideInputJoinDoFn extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> {
- private final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView;
- private final JoinRelType joinType;
- private final BeamSqlRow rightNullRow;
- private final boolean swap;
-
- public SideInputJoinDoFn(JoinRelType joinType, BeamSqlRow rightNullRow,
- PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView,
- boolean swap) {
- this.joinType = joinType;
- this.rightNullRow = rightNullRow;
- this.sideInputView = sideInputView;
- this.swap = swap;
- }
-
- @ProcessElement public void processElement(ProcessContext context) {
- BeamSqlRow key = context.element().getKey();
- BeamSqlRow leftRow = context.element().getValue();
- Map<BeamSqlRow, Iterable<BeamSqlRow>> key2Rows = context.sideInput(sideInputView);
- Iterable<BeamSqlRow> rightRowsIterable = key2Rows.get(key);
-
- if (rightRowsIterable != null && rightRowsIterable.iterator().hasNext()) {
- Iterator<BeamSqlRow> it = rightRowsIterable.iterator();
- while (it.hasNext()) {
- context.output(combineTwoRowsIntoOne(leftRow, it.next(), swap));
- }
- } else {
- if (joinType == JoinRelType.LEFT) {
- context.output(combineTwoRowsIntoOne(leftRow, rightNullRow, swap));
- }
- }
- }
- }
-
-
- /**
- * A {@code SimpleFunction} to combine two rows into one.
- */
- public static class JoinParts2WholeRow
- extends SimpleFunction<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>, BeamSqlRow> {
- @Override public BeamSqlRow apply(KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> input) {
- KV<BeamSqlRow, BeamSqlRow> parts = input.getValue();
- BeamSqlRow leftRow = parts.getKey();
- BeamSqlRow rightRow = parts.getValue();
- return combineTwoRowsIntoOne(leftRow, rightRow, false);
- }
- }
-
- /**
- * As the method name suggests: combine two rows into one wide row.
- */
- private static BeamSqlRow combineTwoRowsIntoOne(BeamSqlRow leftRow,
- BeamSqlRow rightRow, boolean swap) {
- if (swap) {
- return combineTwoRowsIntoOneHelper(rightRow, leftRow);
- } else {
- return combineTwoRowsIntoOneHelper(leftRow, rightRow);
- }
- }
-
- /**
- * As the method name suggests: combine two rows into one wide row.
- */
- private static BeamSqlRow combineTwoRowsIntoOneHelper(BeamSqlRow leftRow,
- BeamSqlRow rightRow) {
- // build the type
- List<String> names = new ArrayList<>(leftRow.size() + rightRow.size());
- names.addAll(leftRow.getDataType().getFieldsName());
- names.addAll(rightRow.getDataType().getFieldsName());
-
- List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size());
- types.addAll(leftRow.getDataType().getFieldsType());
- types.addAll(rightRow.getDataType().getFieldsType());
- BeamSqlRowType type = BeamSqlRowType.create(names, types);
-
- BeamSqlRow row = new BeamSqlRow(type);
- // build the row
- for (int i = 0; i < leftRow.size(); i++) {
- row.addField(i, leftRow.getFieldValue(i));
- }
-
- for (int i = 0; i < rightRow.size(); i++) {
- row.addField(i + leftRow.size(), rightRow.getFieldValue(i));
- }
-
- return row;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java
deleted file mode 100644
index a983cf5..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.transform;
-
-import java.util.Iterator;
-
-import org.apache.beam.dsls.sql.rel.BeamSetOperatorRelBase;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Collections of {@code PTransform} and {@code DoFn} used to perform Set operations.
- */
-public abstract class BeamSetOperatorsTransforms {
- /**
- * Transform a {@code BeamSqlRow} to a {@code KV<BeamSqlRow, BeamSqlRow>}.
- */
- public static class BeamSqlRow2KvFn extends
- SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> {
- @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) {
- return KV.of(input, input);
- }
- }
-
- /**
- * Filter function used for Set operators.
- */
- public static class SetOperatorFilteringDoFn extends
- DoFn<KV<BeamSqlRow, CoGbkResult>, BeamSqlRow> {
- private TupleTag<BeamSqlRow> leftTag;
- private TupleTag<BeamSqlRow> rightTag;
- private BeamSetOperatorRelBase.OpType opType;
- // ALL?
- private boolean all;
-
- public SetOperatorFilteringDoFn(TupleTag<BeamSqlRow> leftTag, TupleTag<BeamSqlRow> rightTag,
- BeamSetOperatorRelBase.OpType opType, boolean all) {
- this.leftTag = leftTag;
- this.rightTag = rightTag;
- this.opType = opType;
- this.all = all;
- }
-
- @ProcessElement public void processElement(ProcessContext ctx) {
- CoGbkResult coGbkResult = ctx.element().getValue();
- Iterable<BeamSqlRow> leftRows = coGbkResult.getAll(leftTag);
- Iterable<BeamSqlRow> rightRows = coGbkResult.getAll(rightTag);
- switch (opType) {
- case UNION:
- if (all) {
- // output both left & right
- Iterator<BeamSqlRow> iter = leftRows.iterator();
- while (iter.hasNext()) {
- ctx.output(iter.next());
- }
- iter = rightRows.iterator();
- while (iter.hasNext()) {
- ctx.output(iter.next());
- }
- } else {
- // only output the key
- ctx.output(ctx.element().getKey());
- }
- break;
- case INTERSECT:
- if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) {
- if (all) {
- for (BeamSqlRow leftRow : leftRows) {
- ctx.output(leftRow);
- }
- } else {
- ctx.output(ctx.element().getKey());
- }
- }
- break;
- case MINUS:
- if (leftRows.iterator().hasNext() && !rightRows.iterator().hasNext()) {
- Iterator<BeamSqlRow> iter = leftRows.iterator();
- if (all) {
- // output all
- while (iter.hasNext()) {
- ctx.output(iter.next());
- }
- } else {
- // only output one
- ctx.output(iter.next());
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlFilterFn.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlFilterFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlFilterFn.java
deleted file mode 100644
index d4dbc6a..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlFilterFn.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.transform;
-
-import java.util.List;
-import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
-import org.apache.beam.dsls.sql.rel.BeamFilterRel;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.transforms.DoFn;
-
-/**
- * {@code BeamSqlFilterFn} is the executor for a {@link BeamFilterRel} step.
- *
- */
-public class BeamSqlFilterFn extends DoFn<BeamSqlRow, BeamSqlRow> {
-
- private String stepName;
- private BeamSqlExpressionExecutor executor;
-
- public BeamSqlFilterFn(String stepName, BeamSqlExpressionExecutor executor) {
- super();
- this.stepName = stepName;
- this.executor = executor;
- }
-
- @Setup
- public void setup() {
- executor.prepare();
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- BeamSqlRow in = c.element();
-
- List<Object> result = executor.execute(in);
-
- if ((Boolean) result.get(0)) {
- c.output(in);
- }
- }
-
- @Teardown
- public void close() {
- executor.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java
deleted file mode 100644
index d8a2a63..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.transform;
-
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.transforms.DoFn;
-
-/**
- * A test PTransform to display output in console.
- *
- */
-public class BeamSqlOutputToConsoleFn extends DoFn<BeamSqlRow, Void> {
-
- private String stepName;
-
- public BeamSqlOutputToConsoleFn(String stepName) {
- super();
- this.stepName = stepName;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- System.out.println("Output: " + c.element().getDataValues());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
deleted file mode 100644
index 886ddcf..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.transform;
-
-import java.util.List;
-import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
-import org.apache.beam.dsls.sql.rel.BeamProjectRel;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.dsls.sql.schema.BeamTableUtils;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-
-/**
- *
- * {@code BeamSqlProjectFn} is the executor for a {@link BeamProjectRel} step.
- *
- */
-public class BeamSqlProjectFn extends DoFn<BeamSqlRow, BeamSqlRow> {
- private String stepName;
- private BeamSqlExpressionExecutor executor;
- private BeamSqlRowType outputRowType;
-
- public BeamSqlProjectFn(String stepName, BeamSqlExpressionExecutor executor,
- BeamSqlRowType outputRowType) {
- super();
- this.stepName = stepName;
- this.executor = executor;
- this.outputRowType = outputRowType;
- }
-
- @Setup
- public void setup() {
- executor.prepare();
- }
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window) {
- BeamSqlRow inputRow = c.element();
- List<Object> results = executor.execute(inputRow);
-
- BeamSqlRow outRow = new BeamSqlRow(outputRowType);
- outRow.updateWindowRange(inputRow, window);
-
- for (int idx = 0; idx < results.size(); ++idx) {
- BeamTableUtils.addFieldWithAutoTypeCasting(outRow, idx, results.get(idx));
- }
-
- c.output(outRow);
- }
-
- @Teardown
- public void close() {
- executor.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/package-info.java
deleted file mode 100644
index 5169749..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * {@link org.apache.beam.sdk.transforms.PTransform} used in a BeamSql pipeline.
- */
-package org.apache.beam.dsls.sql.transform;
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
deleted file mode 100644
index 4b8696b..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.utils;
-
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Utility methods for Calcite related operations.
- */
-public class CalciteUtils {
- private static final Map<Integer, SqlTypeName> JAVA_TO_CALCITE_MAPPING = new HashMap<>();
- private static final Map<SqlTypeName, Integer> CALCITE_TO_JAVA_MAPPING = new HashMap<>();
- static {
- JAVA_TO_CALCITE_MAPPING.put(Types.TINYINT, SqlTypeName.TINYINT);
- JAVA_TO_CALCITE_MAPPING.put(Types.SMALLINT, SqlTypeName.SMALLINT);
- JAVA_TO_CALCITE_MAPPING.put(Types.INTEGER, SqlTypeName.INTEGER);
- JAVA_TO_CALCITE_MAPPING.put(Types.BIGINT, SqlTypeName.BIGINT);
-
- JAVA_TO_CALCITE_MAPPING.put(Types.FLOAT, SqlTypeName.FLOAT);
- JAVA_TO_CALCITE_MAPPING.put(Types.DOUBLE, SqlTypeName.DOUBLE);
-
- JAVA_TO_CALCITE_MAPPING.put(Types.DECIMAL, SqlTypeName.DECIMAL);
-
- JAVA_TO_CALCITE_MAPPING.put(Types.CHAR, SqlTypeName.CHAR);
- JAVA_TO_CALCITE_MAPPING.put(Types.VARCHAR, SqlTypeName.VARCHAR);
-
- JAVA_TO_CALCITE_MAPPING.put(Types.DATE, SqlTypeName.DATE);
- JAVA_TO_CALCITE_MAPPING.put(Types.TIME, SqlTypeName.TIME);
- JAVA_TO_CALCITE_MAPPING.put(Types.TIMESTAMP, SqlTypeName.TIMESTAMP);
-
- JAVA_TO_CALCITE_MAPPING.put(Types.BOOLEAN, SqlTypeName.BOOLEAN);
-
- for (Map.Entry<Integer, SqlTypeName> pair : JAVA_TO_CALCITE_MAPPING.entrySet()) {
- CALCITE_TO_JAVA_MAPPING.put(pair.getValue(), pair.getKey());
- }
- }
-
- /**
- * Get the corresponding {@code SqlTypeName} for an integer sql type.
- */
- public static SqlTypeName toCalciteType(int type) {
- return JAVA_TO_CALCITE_MAPPING.get(type);
- }
-
- /**
- * Get the integer sql type from Calcite {@code SqlTypeName}.
- */
- public static Integer toJavaType(SqlTypeName typeName) {
- return CALCITE_TO_JAVA_MAPPING.get(typeName);
- }
-
- /**
- * Get the {@code SqlTypeName} for the specified column of a table.
- */
- public static SqlTypeName getFieldType(BeamSqlRowType schema, int index) {
- return toCalciteType(schema.getFieldsType().get(index));
- }
-
- /**
- * Generate {@code BeamSqlRowType} from {@code RelDataType} which is used to create table.
- */
- public static BeamSqlRowType toBeamRowType(RelDataType tableInfo) {
- List<String> fieldNames = new ArrayList<>();
- List<Integer> fieldTypes = new ArrayList<>();
- for (RelDataTypeField f : tableInfo.getFieldList()) {
- fieldNames.add(f.getName());
- fieldTypes.add(toJavaType(f.getType().getSqlTypeName()));
- }
- return BeamSqlRowType.create(fieldNames, fieldTypes);
- }
-
- /**
- * Create an instance of {@code RelDataType} so it can be used to create a table.
- */
- public static RelProtoDataType toCalciteRowType(final BeamSqlRowType that) {
- return new RelProtoDataType() {
- @Override
- public RelDataType apply(RelDataTypeFactory a) {
- RelDataTypeFactory.FieldInfoBuilder builder = a.builder();
- for (int idx = 0; idx < that.getFieldsName().size(); ++idx) {
- builder.add(that.getFieldsName().get(idx), toCalciteType(that.getFieldsType().get(idx)));
- }
- return builder.build();
- }
- };
- }
-}