You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/15 18:41:58 UTC
[3/5] beam git commit: [BEAM-2740] Hide BeamSqlEnv.
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTable.java
new file mode 100644
index 0000000..4bedec1
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaCSVTable.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.schema.kafka;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * A Kafka topic that saves records as CSV format.
+ *
+ */
+public class BeamKafkaCSVTable extends BeamKafkaTable {
+ private CSVFormat csvFormat;
+ public BeamKafkaCSVTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers,
+ List<String> topics) {
+ this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT);
+ }
+
+ public BeamKafkaCSVTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers,
+ List<String> topics, CSVFormat format) {
+ super(beamSqlRowType, bootstrapServers, topics);
+ this.csvFormat = format;
+ }
+
+ @Override
+ public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>>
+ getPTransformForInput() {
+ return new CsvRecorderDecoder(beamSqlRowType, csvFormat);
+ }
+
+ @Override
+ public PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>>
+ getPTransformForOutput() {
+ return new CsvRecorderEncoder(beamSqlRowType, csvFormat);
+ }
+
+ /**
+ * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamRecord}.
+ *
+ */
+ public static class CsvRecorderDecoder
+ extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>> {
+ private BeamRecordSqlType rowType;
+ private CSVFormat format;
+ public CsvRecorderDecoder(BeamRecordSqlType rowType, CSVFormat format) {
+ this.rowType = rowType;
+ this.format = format;
+ }
+
+ @Override
+ public PCollection<BeamRecord> expand(PCollection<KV<byte[], byte[]>> input) {
+ return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamRecord>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ String rowInString = new String(c.element().getValue());
+ c.output(BeamTableUtils.csvLine2BeamSqlRow(format, rowInString, rowType));
+ }
+ }));
+ }
+ }
+
+ /**
+ * A PTransform to convert {@link BeamRecord} to {@code KV<byte[], byte[]>}.
+ *
+ */
+ public static class CsvRecorderEncoder
+ extends PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>> {
+ private BeamRecordSqlType rowType;
+ private CSVFormat format;
+ public CsvRecorderEncoder(BeamRecordSqlType rowType, CSVFormat format) {
+ this.rowType = rowType;
+ this.format = format;
+ }
+
+ @Override
+ public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamRecord> input) {
+ return input.apply("encodeRecord", ParDo.of(new DoFn<BeamRecord, KV<byte[], byte[]>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ BeamRecord in = c.element();
+ c.output(KV.of(new byte[] {}, BeamTableUtils.beamSqlRow2CsvLine(in, format).getBytes()));
+ }
+ }));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaTable.java
new file mode 100644
index 0000000..1113abf
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/BeamKafkaTable.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.schema.kafka;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+/**
+ * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to
+ * extend to convert between {@code BeamSqlRow} and {@code KV<byte[], byte[]>}.
+ *
+ */
+public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable {
+
+ private String bootstrapServers;
+ private List<String> topics;
+ private Map<String, Object> configUpdates;
+
+ protected BeamKafkaTable(BeamRecordSqlType beamSqlRowType) {
+ super(beamSqlRowType);
+ }
+
+ public BeamKafkaTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers,
+ List<String> topics) {
+ super(beamSqlRowType);
+ this.bootstrapServers = bootstrapServers;
+ this.topics = topics;
+ }
+
+ public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) {
+ this.configUpdates = configUpdates;
+ return this;
+ }
+
+ @Override
+ public BeamIOType getSourceType() {
+ return BeamIOType.UNBOUNDED;
+ }
+
+ public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>>
+ getPTransformForInput();
+
+ public abstract PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>>
+ getPTransformForOutput();
+
+ @Override
+ public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
+ return PBegin.in(pipeline).apply("read",
+ KafkaIO.<byte[], byte[]>read()
+ .withBootstrapServers(bootstrapServers)
+ .withTopics(topics)
+ .updateConsumerProperties(configUpdates)
+ .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
+ .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
+ .withoutMetadata())
+ .apply("in_format", getPTransformForInput());
+ }
+
+ @Override
+ public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
+ checkArgument(topics != null && topics.size() == 1,
+ "Only one topic can be acceptable as output.");
+
+ return new PTransform<PCollection<BeamRecord>, PDone>() {
+ @Override
+ public PDone expand(PCollection<BeamRecord> input) {
+ return input.apply("out_reformat", getPTransformForOutput()).apply("persistent",
+ KafkaIO.<byte[], byte[]>write()
+ .withBootstrapServers(bootstrapServers)
+ .withTopic(topics.get(0))
+ .withKeySerializer(ByteArraySerializer.class)
+ .withValueSerializer(ByteArraySerializer.class));
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/package-info.java
new file mode 100644
index 0000000..6752e3c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/kafka/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * table schema for KafkaIO.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.schema.kafka;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/package-info.java
new file mode 100644
index 0000000..86e7d06
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * define table schema, to map with Beam IO components.
+ *
+ */
+package org.apache.beam.sdk.extensions.sql.impl.schema;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTable.java
new file mode 100644
index 0000000..a2dd6fb
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTable.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.schema.text;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.commons.csv.CSVFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@code BeamTextCSVTable} is a {@code BeamTextTable} which formatted in CSV.
+ *
+ * <p>
+ * {@link CSVFormat} itself has many dialects, check its javadoc for more info.
+ * </p>
+ */
+public class BeamTextCSVTable extends BeamTextTable {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(BeamTextCSVTable.class);
+
+ private CSVFormat csvFormat;
+
+ /**
+ * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format.
+ */
+ public BeamTextCSVTable(BeamRecordSqlType beamSqlRowType, String filePattern) {
+ this(beamSqlRowType, filePattern, CSVFormat.DEFAULT);
+ }
+
+ public BeamTextCSVTable(BeamRecordSqlType beamSqlRowType, String filePattern,
+ CSVFormat csvFormat) {
+ super(beamSqlRowType, filePattern);
+ this.csvFormat = csvFormat;
+ }
+
+ @Override
+ public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
+ return PBegin.in(pipeline).apply("decodeRecord", TextIO.read().from(filePattern))
+ .apply("parseCSVLine",
+ new BeamTextCSVTableIOReader(beamSqlRowType, filePattern, csvFormat));
+ }
+
+ @Override
+ public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
+ return new BeamTextCSVTableIOWriter(beamSqlRowType, filePattern, csvFormat);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOReader.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOReader.java
new file mode 100644
index 0000000..95f7063
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOReader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.schema.text;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * IOReader for {@code BeamTextCSVTable}.
+ */
+public class BeamTextCSVTableIOReader
+ extends PTransform<PCollection<String>, PCollection<BeamRecord>>
+ implements Serializable {
+ private String filePattern;
+ protected BeamRecordSqlType beamSqlRowType;
+ protected CSVFormat csvFormat;
+
+ public BeamTextCSVTableIOReader(BeamRecordSqlType beamSqlRowType, String filePattern,
+ CSVFormat csvFormat) {
+ this.filePattern = filePattern;
+ this.beamSqlRowType = beamSqlRowType;
+ this.csvFormat = csvFormat;
+ }
+
+ @Override
+ public PCollection<BeamRecord> expand(PCollection<String> input) {
+ return input.apply(ParDo.of(new DoFn<String, BeamRecord>() {
+ @ProcessElement
+ public void processElement(ProcessContext ctx) {
+ String str = ctx.element();
+ ctx.output(BeamTableUtils.csvLine2BeamSqlRow(csvFormat, str, beamSqlRowType));
+ }
+ }));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOWriter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOWriter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOWriter.java
new file mode 100644
index 0000000..4660ccb
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextCSVTableIOWriter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.schema.text;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * IOWriter for {@code BeamTextCSVTable}.
+ */
+public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamRecord>, PDone>
+ implements Serializable {
+ private String filePattern;
+ protected BeamRecordSqlType beamSqlRowType;
+ protected CSVFormat csvFormat;
+
+ public BeamTextCSVTableIOWriter(BeamRecordSqlType beamSqlRowType, String filePattern,
+ CSVFormat csvFormat) {
+ this.filePattern = filePattern;
+ this.beamSqlRowType = beamSqlRowType;
+ this.csvFormat = csvFormat;
+ }
+
+ @Override public PDone expand(PCollection<BeamRecord> input) {
+ return input.apply("encodeRecord", ParDo.of(new DoFn<BeamRecord, String>() {
+
+ @ProcessElement public void processElement(ProcessContext ctx) {
+ BeamRecord row = ctx.element();
+ ctx.output(BeamTableUtils.beamSqlRow2CsvLine(row, csvFormat));
+ }
+ })).apply(TextIO.write().to(filePattern));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextTable.java
new file mode 100644
index 0000000..b0d9c11
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/BeamTextTable.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.schema.text;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType;
+
+/**
+ * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}).
+ */
+public abstract class BeamTextTable extends BaseBeamTable implements Serializable {
+ protected String filePattern;
+
+ protected BeamTextTable(BeamRecordSqlType beamSqlRowType, String filePattern) {
+ super(beamSqlRowType);
+ this.filePattern = filePattern;
+ }
+
+ @Override
+ public BeamIOType getSourceType() {
+ return BeamIOType.BOUNDED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/package-info.java
new file mode 100644
index 0000000..8927dca
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/text/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Table schema for text files.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.schema.text;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
index 40b7b58..9a50e21 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
@@ -33,11 +33,11 @@ import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.UdafImpl;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
index 7a8d10d..3c6b20f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
@@ -22,8 +22,8 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.BeamRecord;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
index aac38c7..719fbf3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java
@@ -19,10 +19,10 @@ package org.apache.beam.sdk.extensions.sql.impl.transform;
import java.util.ArrayList;
import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index 8b6206b..8c44780 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
deleted file mode 100644
index 0564820..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import java.io.Serializable;
-
-/**
- * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
- */
-public abstract class BaseBeamTable implements BeamSqlTable, Serializable {
- protected BeamRecordSqlType beamSqlRowType;
- public BaseBeamTable(BeamRecordSqlType beamSqlRowType) {
- this.beamSqlRowType = beamSqlRowType;
- }
-
- @Override public BeamRecordSqlType getRowType() {
- return beamSqlRowType;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java
deleted file mode 100644
index bda3ca1..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import java.io.Serializable;
-
-/**
- * Type as a source IO, determined whether it's a STREAMING process, or batch
- * process.
- */
-public enum BeamIOType implements Serializable {
- BOUNDED, UNBOUNDED;
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
deleted file mode 100644
index 9d9988e..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PDone;
-
-/**
- * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSqlRow>} as a virtual table,
- * then a downstream query can query directly.
- */
-public class BeamPCollectionTable extends BaseBeamTable {
- private BeamIOType ioType;
- private transient PCollection<BeamRecord> upstream;
-
- protected BeamPCollectionTable(BeamRecordSqlType beamSqlRowType) {
- super(beamSqlRowType);
- }
-
- public BeamPCollectionTable(PCollection<BeamRecord> upstream,
- BeamRecordSqlType beamSqlRowType){
- this(beamSqlRowType);
- ioType = upstream.isBounded().equals(IsBounded.BOUNDED)
- ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED;
- this.upstream = upstream;
- }
-
- @Override
- public BeamIOType getSourceType() {
- return ioType;
- }
-
- @Override
- public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
- return upstream;
- }
-
- @Override
- public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
- throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as target");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java
deleted file mode 100644
index 1845988..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamRecordSqlType.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import java.math.BigDecimal;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.coders.BigDecimalCoder;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.ByteCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.BooleanCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DateCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DoubleCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.FloatCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.ShortCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.TimeCoder;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.BeamRecordType;
-
-/**
- * Type provider for {@link BeamRecord} with SQL types.
- *
- * <p>Limited SQL types are supported now, visit
- * <a href="https://beam.apache.org/blog/2017/07/21/sql-dsl.html#data-type">data types</a>
- * for more details.
- *
- */
-public class BeamRecordSqlType extends BeamRecordType {
- private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>();
- static {
- SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
-
- SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class);
-
- SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
-
- SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
-
- SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
- }
-
- public List<Integer> fieldTypes;
-
- protected BeamRecordSqlType(List<String> fieldsName, List<Coder> fieldsCoder) {
- super(fieldsName, fieldsCoder);
- }
-
- private BeamRecordSqlType(List<String> fieldsName, List<Integer> fieldTypes
- , List<Coder> fieldsCoder) {
- super(fieldsName, fieldsCoder);
- this.fieldTypes = fieldTypes;
- }
-
- public static BeamRecordSqlType create(List<String> fieldNames,
- List<Integer> fieldTypes) {
- if (fieldNames.size() != fieldTypes.size()) {
- throw new IllegalStateException("the sizes of 'dataType' and 'fieldTypes' must match.");
- }
- List<Coder> fieldCoders = new ArrayList<>(fieldTypes.size());
- for (int idx = 0; idx < fieldTypes.size(); ++idx) {
- switch (fieldTypes.get(idx)) {
- case Types.INTEGER:
- fieldCoders.add(BigEndianIntegerCoder.of());
- break;
- case Types.SMALLINT:
- fieldCoders.add(ShortCoder.of());
- break;
- case Types.TINYINT:
- fieldCoders.add(ByteCoder.of());
- break;
- case Types.DOUBLE:
- fieldCoders.add(DoubleCoder.of());
- break;
- case Types.FLOAT:
- fieldCoders.add(FloatCoder.of());
- break;
- case Types.DECIMAL:
- fieldCoders.add(BigDecimalCoder.of());
- break;
- case Types.BIGINT:
- fieldCoders.add(BigEndianLongCoder.of());
- break;
- case Types.VARCHAR:
- case Types.CHAR:
- fieldCoders.add(StringUtf8Coder.of());
- break;
- case Types.TIME:
- fieldCoders.add(TimeCoder.of());
- break;
- case Types.DATE:
- case Types.TIMESTAMP:
- fieldCoders.add(DateCoder.of());
- break;
- case Types.BOOLEAN:
- fieldCoders.add(BooleanCoder.of());
- break;
-
- default:
- throw new UnsupportedOperationException(
- "Data type: " + fieldTypes.get(idx) + " not supported yet!");
- }
- }
- return new BeamRecordSqlType(fieldNames, fieldTypes, fieldCoders);
- }
-
- @Override
- public void validateValueType(int index, Object fieldValue) throws IllegalArgumentException {
- if (null == fieldValue) {// no need to do type check for NULL value
- return;
- }
-
- int fieldType = fieldTypes.get(index);
- Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType);
- if (javaClazz == null) {
- throw new IllegalArgumentException("Data type: " + fieldType + " not supported yet!");
- }
-
- if (!fieldValue.getClass().equals(javaClazz)) {
- throw new IllegalArgumentException(
- String.format("[%s](%s) doesn't match type [%s]",
- fieldValue, fieldValue.getClass(), fieldType)
- );
- }
- }
-
- public List<Integer> getFieldTypes() {
- return fieldTypes;
- }
-
- public Integer getFieldTypeByIndex(int index){
- return fieldTypes.get(index);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj != null && obj instanceof BeamRecordSqlType) {
- BeamRecordSqlType ins = (BeamRecordSqlType) obj;
- return fieldTypes.equals(ins.getFieldTypes()) && getFieldNames().equals(ins.getFieldNames());
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return 31 * getFieldNames().hashCode() + getFieldTypes().hashCode();
- }
-
- @Override
- public String toString() {
- return "BeamRecordSqlType [fieldNames=" + getFieldNames()
- + ", fieldTypes=" + fieldTypes + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java
deleted file mode 100644
index 89eefd1..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.math.BigDecimal;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.BigDecimalCoder;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.values.BeamRecord;
-
-/**
- * A {@link Coder} encodes {@link BeamRecord}.
- */
-@Experimental
-public class BeamSqlRecordHelper {
-
- public static BeamRecordSqlType getSqlRecordType(BeamRecord record) {
- return (BeamRecordSqlType) record.getDataType();
- }
-
- /**
- * {@link Coder} for Java type {@link Short}.
- */
- public static class ShortCoder extends CustomCoder<Short> {
- private static final ShortCoder INSTANCE = new ShortCoder();
-
- public static ShortCoder of() {
- return INSTANCE;
- }
-
- private ShortCoder() {
- }
-
- @Override
- public void encode(Short value, OutputStream outStream) throws CoderException, IOException {
- new DataOutputStream(outStream).writeShort(value);
- }
-
- @Override
- public Short decode(InputStream inStream) throws CoderException, IOException {
- return new DataInputStream(inStream).readShort();
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- }
- }
- /**
- * {@link Coder} for Java type {@link Float}, it's stored as {@link BigDecimal}.
- */
- public static class FloatCoder extends CustomCoder<Float> {
- private static final FloatCoder INSTANCE = new FloatCoder();
- private static final BigDecimalCoder CODER = BigDecimalCoder.of();
-
- public static FloatCoder of() {
- return INSTANCE;
- }
-
- private FloatCoder() {
- }
-
- @Override
- public void encode(Float value, OutputStream outStream) throws CoderException, IOException {
- CODER.encode(new BigDecimal(value), outStream);
- }
-
- @Override
- public Float decode(InputStream inStream) throws CoderException, IOException {
- return CODER.decode(inStream).floatValue();
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- }
- }
- /**
- * {@link Coder} for Java type {@link Double}, it's stored as {@link BigDecimal}.
- */
- public static class DoubleCoder extends CustomCoder<Double> {
- private static final DoubleCoder INSTANCE = new DoubleCoder();
- private static final BigDecimalCoder CODER = BigDecimalCoder.of();
-
- public static DoubleCoder of() {
- return INSTANCE;
- }
-
- private DoubleCoder() {
- }
-
- @Override
- public void encode(Double value, OutputStream outStream) throws CoderException, IOException {
- CODER.encode(new BigDecimal(value), outStream);
- }
-
- @Override
- public Double decode(InputStream inStream) throws CoderException, IOException {
- return CODER.decode(inStream).doubleValue();
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- }
- }
-
- /**
- * {@link Coder} for Java type {@link GregorianCalendar}, it's stored as {@link Long}.
- */
- public static class TimeCoder extends CustomCoder<GregorianCalendar> {
- private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
- private static final TimeCoder INSTANCE = new TimeCoder();
-
- public static TimeCoder of() {
- return INSTANCE;
- }
-
- private TimeCoder() {
- }
-
- @Override
- public void encode(GregorianCalendar value, OutputStream outStream)
- throws CoderException, IOException {
- longCoder.encode(value.getTime().getTime(), outStream);
- }
-
- @Override
- public GregorianCalendar decode(InputStream inStream) throws CoderException, IOException {
- GregorianCalendar calendar = new GregorianCalendar();
- calendar.setTime(new Date(longCoder.decode(inStream)));
- return calendar;
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- }
- }
- /**
- * {@link Coder} for Java type {@link Date}, it's stored as {@link Long}.
- */
- public static class DateCoder extends CustomCoder<Date> {
- private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
- private static final DateCoder INSTANCE = new DateCoder();
-
- public static DateCoder of() {
- return INSTANCE;
- }
-
- private DateCoder() {
- }
-
- @Override
- public void encode(Date value, OutputStream outStream) throws CoderException, IOException {
- longCoder.encode(value.getTime(), outStream);
- }
-
- @Override
- public Date decode(InputStream inStream) throws CoderException, IOException {
- return new Date(longCoder.decode(inStream));
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- }
- }
-
- /**
- * {@link Coder} for Java type {@link Boolean}.
- */
- public static class BooleanCoder extends CustomCoder<Boolean> {
- private static final BooleanCoder INSTANCE = new BooleanCoder();
-
- public static BooleanCoder of() {
- return INSTANCE;
- }
-
- private BooleanCoder() {
- }
-
- @Override
- public void encode(Boolean value, OutputStream outStream) throws CoderException, IOException {
- new DataOutputStream(outStream).writeBoolean(value);
- }
-
- @Override
- public Boolean decode(InputStream inStream) throws CoderException, IOException {
- return new DataInputStream(inStream).readBoolean();
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
deleted file mode 100644
index 828ac43..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-/**
- * This interface defines a Beam Sql Table.
- */
-public interface BeamSqlTable {
- /**
- * In Beam SQL, there's no difference between a batch query and a streaming
- * query. {@link BeamIOType} is used to validate the sources.
- */
- BeamIOType getSourceType();
-
- /**
- * create a {@code PCollection<BeamSqlRow>} from source.
- *
- */
- PCollection<BeamRecord> buildIOReader(Pipeline pipeline);
-
- /**
- * create a {@code IO.write()} instance to write to target.
- *
- */
- PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter();
-
- /**
- * Get the schema info of the table.
- */
- BeamRecordSqlType getRowType();
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java
deleted file mode 100644
index 191b78e..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import java.io.Serializable;
-
-/**
- * Interface to create a UDF in Beam SQL.
- *
- * <p>A static method {@code eval} is required. Here is an example:
- *
- * <blockquote><pre>
- * public static class MyLeftFunction {
- * public String eval(
- * @Parameter(name = "s") String s,
- * @Parameter(name = "n", optional = true) Integer n) {
- * return s.substring(0, n == null ? 1 : n);
- * }
- * }</pre></blockquote>
- *
- * <p>The first parameter is named "s" and is mandatory,
- * and the second parameter is named "n" and is optional.
- */
-public interface BeamSqlUdf extends Serializable {
- String UDF_METHOD = "eval";
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
deleted file mode 100644
index 687a082..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.util.NlsString;
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVParser;
-import org.apache.commons.csv.CSVPrinter;
-import org.apache.commons.csv.CSVRecord;
-
-/**
- * Utility methods for working with {@code BeamTable}.
- */
-public final class BeamTableUtils {
- public static BeamRecord csvLine2BeamSqlRow(
- CSVFormat csvFormat,
- String line,
- BeamRecordSqlType beamRecordSqlType) {
- List<Object> fieldsValue = new ArrayList<>(beamRecordSqlType.getFieldCount());
- try (StringReader reader = new StringReader(line)) {
- CSVParser parser = csvFormat.parse(reader);
- CSVRecord rawRecord = parser.getRecords().get(0);
-
- if (rawRecord.size() != beamRecordSqlType.getFieldCount()) {
- throw new IllegalArgumentException(String.format(
- "Expect %d fields, but actually %d",
- beamRecordSqlType.getFieldCount(), rawRecord.size()
- ));
- } else {
- for (int idx = 0; idx < beamRecordSqlType.getFieldCount(); idx++) {
- String raw = rawRecord.get(idx);
- fieldsValue.add(autoCastField(beamRecordSqlType.getFieldTypeByIndex(idx), raw));
- }
- }
- } catch (IOException e) {
- throw new IllegalArgumentException("decodeRecord failed!", e);
- }
- return new BeamRecord(beamRecordSqlType, fieldsValue);
- }
-
- public static String beamSqlRow2CsvLine(BeamRecord row, CSVFormat csvFormat) {
- StringWriter writer = new StringWriter();
- try (CSVPrinter printer = csvFormat.print(writer)) {
- for (int i = 0; i < row.getFieldCount(); i++) {
- printer.print(row.getFieldValue(i).toString());
- }
- printer.println();
- } catch (IOException e) {
- throw new IllegalArgumentException("encodeRecord failed!", e);
- }
- return writer.toString();
- }
-
- public static Object autoCastField(int fieldType, Object rawObj) {
- if (rawObj == null) {
- return null;
- }
-
- SqlTypeName columnType = CalciteUtils.toCalciteType(fieldType);
- // auto-casting for numberics
- if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType))
- || (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) {
- String raw = rawObj.toString();
- switch (columnType) {
- case TINYINT:
- return Byte.valueOf(raw);
- case SMALLINT:
- return Short.valueOf(raw);
- case INTEGER:
- return Integer.valueOf(raw);
- case BIGINT:
- return Long.valueOf(raw);
- case FLOAT:
- return Float.valueOf(raw);
- case DOUBLE:
- return Double.valueOf(raw);
- default:
- throw new UnsupportedOperationException(
- String.format("Column type %s is not supported yet!", columnType));
- }
- } else if (SqlTypeName.CHAR_TYPES.contains(columnType)) {
- // convert NlsString to String
- if (rawObj instanceof NlsString) {
- return ((NlsString) rawObj).getValue();
- } else {
- return rawObj;
- }
- } else {
- return rawObj;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
deleted file mode 100644
index 8c7e6f0..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema.kafka;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.commons.csv.CSVFormat;
-
-/**
- * A Kafka topic that saves records as CSV format.
- *
- */
-public class BeamKafkaCSVTable extends BeamKafkaTable {
- private CSVFormat csvFormat;
- public BeamKafkaCSVTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers,
- List<String> topics) {
- this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT);
- }
-
- public BeamKafkaCSVTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers,
- List<String> topics, CSVFormat format) {
- super(beamSqlRowType, bootstrapServers, topics);
- this.csvFormat = format;
- }
-
- @Override
- public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>>
- getPTransformForInput() {
- return new CsvRecorderDecoder(beamSqlRowType, csvFormat);
- }
-
- @Override
- public PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>>
- getPTransformForOutput() {
- return new CsvRecorderEncoder(beamSqlRowType, csvFormat);
- }
-
- /**
- * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamRecord}.
- *
- */
- public static class CsvRecorderDecoder
- extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>> {
- private BeamRecordSqlType rowType;
- private CSVFormat format;
- public CsvRecorderDecoder(BeamRecordSqlType rowType, CSVFormat format) {
- this.rowType = rowType;
- this.format = format;
- }
-
- @Override
- public PCollection<BeamRecord> expand(PCollection<KV<byte[], byte[]>> input) {
- return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamRecord>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- String rowInString = new String(c.element().getValue());
- c.output(BeamTableUtils.csvLine2BeamSqlRow(format, rowInString, rowType));
- }
- }));
- }
- }
-
- /**
- * A PTransform to convert {@link BeamRecord} to {@code KV<byte[], byte[]>}.
- *
- */
- public static class CsvRecorderEncoder
- extends PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>> {
- private BeamRecordSqlType rowType;
- private CSVFormat format;
- public CsvRecorderEncoder(BeamRecordSqlType rowType, CSVFormat format) {
- this.rowType = rowType;
- this.format = format;
- }
-
- @Override
- public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamRecord> input) {
- return input.apply("encodeRecord", ParDo.of(new DoFn<BeamRecord, KV<byte[], byte[]>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- BeamRecord in = c.element();
- c.output(KV.of(new byte[] {}, BeamTableUtils.beamSqlRow2CsvLine(in, format).getBytes()));
- }
- }));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
deleted file mode 100644
index 1d57839..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema.kafka;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.io.kafka.KafkaIO;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-
-/**
- * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to
- * extend to convert between {@code BeamSqlRow} and {@code KV<byte[], byte[]>}.
- *
- */
-public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable {
-
- private String bootstrapServers;
- private List<String> topics;
- private Map<String, Object> configUpdates;
-
- protected BeamKafkaTable(BeamRecordSqlType beamSqlRowType) {
- super(beamSqlRowType);
- }
-
- public BeamKafkaTable(BeamRecordSqlType beamSqlRowType, String bootstrapServers,
- List<String> topics) {
- super(beamSqlRowType);
- this.bootstrapServers = bootstrapServers;
- this.topics = topics;
- }
-
- public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) {
- this.configUpdates = configUpdates;
- return this;
- }
-
- @Override
- public BeamIOType getSourceType() {
- return BeamIOType.UNBOUNDED;
- }
-
- public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>>
- getPTransformForInput();
-
- public abstract PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>>
- getPTransformForOutput();
-
- @Override
- public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
- return PBegin.in(pipeline).apply("read",
- KafkaIO.<byte[], byte[]>read()
- .withBootstrapServers(bootstrapServers)
- .withTopics(topics)
- .updateConsumerProperties(configUpdates)
- .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
- .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
- .withoutMetadata())
- .apply("in_format", getPTransformForInput());
- }
-
- @Override
- public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
- checkArgument(topics != null && topics.size() == 1,
- "Only one topic can be acceptable as output.");
-
- return new PTransform<PCollection<BeamRecord>, PDone>() {
- @Override
- public PDone expand(PCollection<BeamRecord> input) {
- return input.apply("out_reformat", getPTransformForOutput()).apply("persistent",
- KafkaIO.<byte[], byte[]>write()
- .withBootstrapServers(bootstrapServers)
- .withTopic(topics.get(0))
- .withKeySerializer(ByteArraySerializer.class)
- .withValueSerializer(ByteArraySerializer.class));
- }
- };
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java
deleted file mode 100644
index f0ddeb6..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * table schema for KafkaIO.
- */
-package org.apache.beam.sdk.extensions.sql.schema.kafka;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/package-info.java
deleted file mode 100644
index 9655ebd..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * define table schema, to map with Beam IO components.
- *
- */
-package org.apache.beam.sdk.extensions.sql.schema;
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
deleted file mode 100644
index 79e56e6..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.schema.text;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.commons.csv.CSVFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@code BeamTextCSVTable} is a {@code BeamTextTable} which formatted in CSV.
- *
- * <p>
- * {@link CSVFormat} itself has many dialects, check its javadoc for more info.
- * </p>
- */
-public class BeamTextCSVTable extends BeamTextTable {
- private static final Logger LOG = LoggerFactory
- .getLogger(BeamTextCSVTable.class);
-
- private CSVFormat csvFormat;
-
- /**
- * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format.
- */
- public BeamTextCSVTable(BeamRecordSqlType beamSqlRowType, String filePattern) {
- this(beamSqlRowType, filePattern, CSVFormat.DEFAULT);
- }
-
- public BeamTextCSVTable(BeamRecordSqlType beamSqlRowType, String filePattern,
- CSVFormat csvFormat) {
- super(beamSqlRowType, filePattern);
- this.csvFormat = csvFormat;
- }
-
- @Override
- public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
- return PBegin.in(pipeline).apply("decodeRecord", TextIO.read().from(filePattern))
- .apply("parseCSVLine",
- new BeamTextCSVTableIOReader(beamSqlRowType, filePattern, csvFormat));
- }
-
- @Override
- public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
- return new BeamTextCSVTableIOWriter(beamSqlRowType, filePattern, csvFormat);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
deleted file mode 100644
index 018dae5..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.schema.text;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.commons.csv.CSVFormat;
-
-/**
- * IOReader for {@code BeamTextCSVTable}.
- */
-public class BeamTextCSVTableIOReader
- extends PTransform<PCollection<String>, PCollection<BeamRecord>>
- implements Serializable {
- private String filePattern;
- protected BeamRecordSqlType beamSqlRowType;
- protected CSVFormat csvFormat;
-
- public BeamTextCSVTableIOReader(BeamRecordSqlType beamSqlRowType, String filePattern,
- CSVFormat csvFormat) {
- this.filePattern = filePattern;
- this.beamSqlRowType = beamSqlRowType;
- this.csvFormat = csvFormat;
- }
-
- @Override
- public PCollection<BeamRecord> expand(PCollection<String> input) {
- return input.apply(ParDo.of(new DoFn<String, BeamRecord>() {
- @ProcessElement
- public void processElement(ProcessContext ctx) {
- String str = ctx.element();
- ctx.output(BeamTableUtils.csvLine2BeamSqlRow(csvFormat, str, beamSqlRowType));
- }
- }));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
deleted file mode 100644
index 53eb382..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.schema.text;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.commons.csv.CSVFormat;
-
-/**
- * IOWriter for {@code BeamTextCSVTable}.
- */
-public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamRecord>, PDone>
- implements Serializable {
- private String filePattern;
- protected BeamRecordSqlType beamSqlRowType;
- protected CSVFormat csvFormat;
-
- public BeamTextCSVTableIOWriter(BeamRecordSqlType beamSqlRowType, String filePattern,
- CSVFormat csvFormat) {
- this.filePattern = filePattern;
- this.beamSqlRowType = beamSqlRowType;
- this.csvFormat = csvFormat;
- }
-
- @Override public PDone expand(PCollection<BeamRecord> input) {
- return input.apply("encodeRecord", ParDo.of(new DoFn<BeamRecord, String>() {
-
- @ProcessElement public void processElement(ProcessContext ctx) {
- BeamRecord row = ctx.element();
- ctx.output(BeamTableUtils.beamSqlRow2CsvLine(row, csvFormat));
- }
- })).apply(TextIO.write().to(filePattern));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
deleted file mode 100644
index 80e81aa..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.schema.text;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
-
-/**
- * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}).
- */
-public abstract class BeamTextTable extends BaseBeamTable implements Serializable {
- protected String filePattern;
-
- protected BeamTextTable(BeamRecordSqlType beamSqlRowType, String filePattern) {
- super(beamSqlRowType);
- this.filePattern = filePattern;
- }
-
- @Override
- public BeamIOType getSourceType() {
- return BeamIOType.BOUNDED;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/49aad927/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java
deleted file mode 100644
index f914e2e..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Table schema for text files.
- */
-package org.apache.beam.sdk.extensions.sql.schema.text;