You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2019/09/20 02:55:27 UTC

[beam] branch master updated: [BEAM-8203] Add support for AvroTable in SQL

This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 94a8b26  [BEAM-8203] Add support for AvroTable in SQL
     new b8b7bce  Merge pull request #9597 from bmv126/avro-table-provider-support-in-sql
94a8b26 is described below

commit 94a8b267a994a1f168a56a6ca82b481a273997ee
Author: B M VISHWAS <b_...@nokia.com>
AuthorDate: Tue Sep 17 08:25:16 2019 -0500

    [BEAM-8203] Add support for AvroTable in SQL
---
 .../apache/beam/sdk/schemas/utils/AvroUtils.java   |  2 +-
 .../sql/meta/provider/avro/AvroTable.java          | 81 ++++++++++++++++++++++
 .../sql/meta/provider/avro/AvroTableProvider.java  | 52 ++++++++++++++
 .../provider/avro/GenericRecordWriteConverter.java | 67 ++++++++++++++++++
 .../sql/meta/provider/avro/package-info.java       | 20 ++++++
 .../meta/provider/avro/AvroTableProviderTest.java  | 77 ++++++++++++++++++++
 6 files changed, 298 insertions(+), 1 deletion(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
index 7a87aef..7b43961 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
@@ -204,7 +204,7 @@ public class AvroUtils {
   }
 
   /** Converts a Beam Schema into an AVRO schema. */
-  private static org.apache.avro.Schema toAvroSchema(
+  public static org.apache.avro.Schema toAvroSchema(
       Schema beamSchema, @Nullable String name, @Nullable String namespace) {
     final String schemaName = Strings.isNullOrEmpty(name) ? "topLevelRecord" : name;
     final String schemaNamespace = namespace == null ? "" : namespace;
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/AvroTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/AvroTable.java
new file mode 100644
index 0000000..7fab186
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/AvroTable.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.avro;
+
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
+import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+
+/** {@link AvroTable} is a {@link org.apache.beam.sdk.extensions.sql.BeamSqlTable}. */
+public class AvroTable extends BaseBeamTable implements Serializable {
+  private final String filePattern;
+  private final String tableName;
+
+  public AvroTable(String tableName, Schema beamSchema, String filePattern) {
+    super(beamSchema);
+    this.filePattern = filePattern;
+    this.tableName = tableName;
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(PBegin begin) {
+
+    return begin
+        .apply(
+            "AvroIORead",
+            AvroIO.readGenericRecords(AvroUtils.toAvroSchema(schema, tableName, null))
+                .withBeamSchemas(true)
+                .from(filePattern))
+        .apply("GenericRecordToRow", Convert.toRows());
+  }
+
+  @Override
+  public PDone buildIOWriter(PCollection<Row> input) {
+    PTransform<PCollection<Row>, PCollection<GenericRecord>> writeConverter =
+        GenericRecordWriteConverter.builder().beamSchema(schema).build();
+
+    return input
+        .apply("GenericRecordToRow", writeConverter)
+        .apply(
+            "AvroIOWrite",
+            AvroIO.writeGenericRecords(AvroUtils.toAvroSchema(schema, tableName, null))
+                .to(filePattern)
+                .withoutSharding());
+  }
+
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.BOUNDED;
+  }
+
+  @Override
+  public BeamTableStatistics getTableStatistics(PipelineOptions options) {
+    return BeamTableStatistics.BOUNDED_UNKNOWN;
+  }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/AvroTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/AvroTableProvider.java
new file mode 100644
index 0000000..08dd7f7
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/AvroTableProvider.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.avro;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+
+/**
+ * {@link TableProvider} for {@link AvroTable}.
+ *
+ * <p>A sample of avro table is:
+ *
+ * <pre>{@code
+ * CREATE EXTERNAL TABLE ORDERS(
+ *   name VARCHAR,
+ *   favorite_color VARCHAR,
+ *   favorite_numbers ARRAY<INTEGER>
+ * )
+ * TYPE 'avro'
+ * LOCATION '/tmp/persons.avro'
+ * }</pre>
+ */
+@AutoService(TableProvider.class)
+public class AvroTableProvider extends InMemoryMetaTableProvider {
+  @Override
+  public String getTableType() {
+    return "avro";
+  }
+
+  @Override
+  public BeamSqlTable buildBeamSqlTable(Table table) {
+    return new AvroTable(table.getName(), table.getSchema(), table.getLocation());
+  }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/GenericRecordWriteConverter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/GenericRecordWriteConverter.java
new file mode 100644
index 0000000..6ca1621
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/GenericRecordWriteConverter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.avro;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/** A {@link PTransform} to convert {@link Row} to {@link GenericRecord}. */
+@AutoValue
+public abstract class GenericRecordWriteConverter
+    extends PTransform<PCollection<Row>, PCollection<GenericRecord>> implements Serializable {
+
+  public abstract Schema beamSchema();
+
+  public static Builder builder() {
+    return new AutoValue_GenericRecordWriteConverter.Builder();
+  }
+
+  @Override
+  public PCollection<GenericRecord> expand(PCollection<Row> input) {
+    return input
+        .apply(
+            "RowsToGenericRecord",
+            ParDo.of(
+                new DoFn<Row, GenericRecord>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    GenericRecord genericRecord =
+                        AvroUtils.toGenericRecord(
+                            c.element(), AvroUtils.toAvroSchema(beamSchema()));
+                    c.output(genericRecord);
+                  }
+                }))
+        .setCoder(AvroCoder.of(GenericRecord.class, AvroUtils.toAvroSchema(beamSchema())));
+  }
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    public abstract Builder beamSchema(Schema beamSchema);
+
+    public abstract GenericRecordWriteConverter build();
+  }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/package-info.java
new file mode 100644
index 0000000..f50679c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Table schema for AvroIO. */
+package org.apache.beam.sdk.extensions.sql.meta.provider.avro;
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/AvroTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/AvroTableProviderTest.java
new file mode 100644
index 0000000..60bd6f7
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/avro/AvroTableProviderTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.avro;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test for AvroTable. */
+@RunWith(JUnit4.class)
+public class AvroTableProviderTest {
+  @Rule public TestPipeline writePipeline = TestPipeline.create();
+  @Rule public TestPipeline readPipeline = TestPipeline.create();
+  @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  private static final String AVRO_FIELD_NAMES = "(name VARCHAR, age BIGINT, country VARCHAR)";
+
+  private static final Schema OUTPUT_ROW_SCHEMA =
+      Schema.builder().addInt64Field("age").addStringField("country").build();
+
+  @Test
+  public void testReadAndWriteAvroTable() {
+    File destinationFile = new File(tempFolder.getRoot(), "person-info.avro");
+
+    BeamSqlEnv env = BeamSqlEnv.inMemory(new AvroTableProvider());
+    env.executeDdl(
+        String.format(
+            "CREATE EXTERNAL TABLE PersonInfo %s TYPE avro LOCATION '%s'",
+            AVRO_FIELD_NAMES, destinationFile.getAbsolutePath()));
+
+    BeamSqlRelUtils.toPCollection(
+        writePipeline,
+        env.parseQuery(
+            "INSERT INTO PersonInfo VALUES ('Alan', 22, 'England'), ('John', 42, 'USA')"));
+
+    writePipeline.run().waitUntilFinish();
+
+    PCollection<Row> rows =
+        BeamSqlRelUtils.toPCollection(
+            readPipeline, env.parseQuery("SELECT age, country FROM PersonInfo where age > 25"));
+
+    PAssert.that(rows)
+        .containsInAnyOrder(Row.withSchema(OUTPUT_ROW_SCHEMA).addValues(42L, "USA").build());
+
+    PipelineResult.State state = readPipeline.run().waitUntilFinish();
+    assertEquals(state, State.DONE);
+  }
+}