You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2019/09/20 02:55:27 UTC
[beam] branch master updated: [BEAM-8203] Add support for AvroTable
in SQL
This is an automated email from the ASF dual-hosted git repository.
amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 94a8b26 [BEAM-8203] Add support for AvroTable in SQL
new b8b7bce Merge pull request #9597 from bmv126/avro-table-provider-support-in-sql
94a8b26 is described below
commit 94a8b267a994a1f168a56a6ca82b481a273997ee
Author: B M VISHWAS <b_...@nokia.com>
AuthorDate: Tue Sep 17 08:25:16 2019 -0500
[BEAM-8203] Add support for AvroTable in SQL
---
.../apache/beam/sdk/schemas/utils/AvroUtils.java | 2 +-
.../sql/meta/provider/avro/AvroTable.java | 81 ++++++++++++++++++++++
.../sql/meta/provider/avro/AvroTableProvider.java | 52 ++++++++++++++
.../provider/avro/GenericRecordWriteConverter.java | 67 ++++++++++++++++++
.../sql/meta/provider/avro/package-info.java | 20 ++++++
.../meta/provider/avro/AvroTableProviderTest.java | 77 ++++++++++++++++++++
6 files changed, 298 insertions(+), 1 deletion(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
index 7a87aef..7b43961 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
@@ -204,7 +204,7 @@ public class AvroUtils {
}
/** Converts a Beam Schema into an AVRO schema. */
- private static org.apache.avro.Schema toAvroSchema(
+ public static org.apache.avro.Schema toAvroSchema(
Schema beamSchema, @Nullable String name, @Nullable String namespace) {
final String schemaName = Strings.isNullOrEmpty(name) ? "topLevelRecord" : name;
final String schemaNamespace = namespace == null ? "" : namespace;
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/AvroTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/AvroTable.java
new file mode 100644
index 0000000..7fab186
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/AvroTable.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.avro;
+
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
+import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+
+/** {@link AvroTable} is a {@link org.apache.beam.sdk.extensions.sql.BeamSqlTable}. */
+public class AvroTable extends BaseBeamTable implements Serializable {
+ private final String filePattern;
+ private final String tableName;
+
+ public AvroTable(String tableName, Schema beamSchema, String filePattern) {
+ super(beamSchema);
+ this.filePattern = filePattern;
+ this.tableName = tableName;
+ }
+
+ @Override
+ public PCollection<Row> buildIOReader(PBegin begin) {
+
+ return begin
+ .apply(
+ "AvroIORead",
+ AvroIO.readGenericRecords(AvroUtils.toAvroSchema(schema, tableName, null))
+ .withBeamSchemas(true)
+ .from(filePattern))
+ .apply("GenericRecordToRow", Convert.toRows());
+ }
+
+ @Override
+ public PDone buildIOWriter(PCollection<Row> input) {
+ PTransform<PCollection<Row>, PCollection<GenericRecord>> writeConverter =
+ GenericRecordWriteConverter.builder().beamSchema(schema).build();
+
+ return input
+ .apply("GenericRecordToRow", writeConverter)
+ .apply(
+ "AvroIOWrite",
+ AvroIO.writeGenericRecords(AvroUtils.toAvroSchema(schema, tableName, null))
+ .to(filePattern)
+ .withoutSharding());
+ }
+
+ @Override
+ public PCollection.IsBounded isBounded() {
+ return PCollection.IsBounded.BOUNDED;
+ }
+
+ @Override
+ public BeamTableStatistics getTableStatistics(PipelineOptions options) {
+ return BeamTableStatistics.BOUNDED_UNKNOWN;
+ }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/AvroTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/AvroTableProvider.java
new file mode 100644
index 0000000..08dd7f7
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/AvroTableProvider.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.avro;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+
+/**
+ * {@link TableProvider} for {@link AvroTable}.
+ *
+ * <p>A sample of avro table is:
+ *
+ * <pre>{@code
+ * CREATE EXTERNAL TABLE ORDERS(
+ * name VARCHAR,
+ * favorite_color VARCHAR,
+ * favorite_numbers ARRAY<INTEGER>
+ * )
+ * TYPE 'avro'
+ * LOCATION '/tmp/persons.avro'
+ * }</pre>
+ */
+@AutoService(TableProvider.class)
+public class AvroTableProvider extends InMemoryMetaTableProvider {
+ @Override
+ public String getTableType() {
+ return "avro";
+ }
+
+ @Override
+ public BeamSqlTable buildBeamSqlTable(Table table) {
+ return new AvroTable(table.getName(), table.getSchema(), table.getLocation());
+ }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/GenericRecordWriteConverter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/GenericRecordWriteConverter.java
new file mode 100644
index 0000000..6ca1621
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/GenericRecordWriteConverter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.avro;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/** A {@link PTransform} to convert {@link Row} to {@link GenericRecord}. */
+@AutoValue
+public abstract class GenericRecordWriteConverter
+ extends PTransform<PCollection<Row>, PCollection<GenericRecord>> implements Serializable {
+
+ public abstract Schema beamSchema();
+
+ public static Builder builder() {
+ return new AutoValue_GenericRecordWriteConverter.Builder();
+ }
+
+ @Override
+ public PCollection<GenericRecord> expand(PCollection<Row> input) {
+ return input
+ .apply(
+ "RowsToGenericRecord",
+ ParDo.of(
+ new DoFn<Row, GenericRecord>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ GenericRecord genericRecord =
+ AvroUtils.toGenericRecord(
+ c.element(), AvroUtils.toAvroSchema(beamSchema()));
+ c.output(genericRecord);
+ }
+ }))
+ .setCoder(AvroCoder.of(GenericRecord.class, AvroUtils.toAvroSchema(beamSchema())));
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ public abstract Builder beamSchema(Schema beamSchema);
+
+ public abstract GenericRecordWriteConverter build();
+ }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/package-info.java
new file mode 100644
index 0000000..f50679c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Table schema for AvroIO. */
+package org.apache.beam.sdk.extensions.sql.meta.provider.avro;
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/AvroTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/AvroTableProviderTest.java
new file mode 100644
index 0000000..60bd6f7
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/AvroTableProviderTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.avro;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test for AvroTable. */
+@RunWith(JUnit4.class)
+public class AvroTableProviderTest {
+ @Rule public TestPipeline writePipeline = TestPipeline.create();
+ @Rule public TestPipeline readPipeline = TestPipeline.create();
+ @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private static final String AVRO_FIELD_NAMES = "(name VARCHAR, age BIGINT, country VARCHAR)";
+
+ private static final Schema OUTPUT_ROW_SCHEMA =
+ Schema.builder().addInt64Field("age").addStringField("country").build();
+
+ @Test
+ public void testReadAndWriteAvroTable() {
+ File destinationFile = new File(tempFolder.getRoot(), "person-info.avro");
+
+ BeamSqlEnv env = BeamSqlEnv.inMemory(new AvroTableProvider());
+ env.executeDdl(
+ String.format(
+ "CREATE EXTERNAL TABLE PersonInfo %s TYPE avro LOCATION '%s'",
+ AVRO_FIELD_NAMES, destinationFile.getAbsolutePath()));
+
+ BeamSqlRelUtils.toPCollection(
+ writePipeline,
+ env.parseQuery(
+ "INSERT INTO PersonInfo VALUES ('Alan', 22, 'England'), ('John', 42, 'USA')"));
+
+ writePipeline.run().waitUntilFinish();
+
+ PCollection<Row> rows =
+ BeamSqlRelUtils.toPCollection(
+ readPipeline, env.parseQuery("SELECT age, country FROM PersonInfo where age > 25"));
+
+ PAssert.that(rows)
+ .containsInAnyOrder(Row.withSchema(OUTPUT_ROW_SCHEMA).addValues(42L, "USA").build());
+
+ PipelineResult.State state = readPipeline.run().waitUntilFinish();
+ assertEquals(state, State.DONE);
+ }
+}