You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 19:53:32 UTC

[36/50] [abbrv] beam git commit: [BEAM-2532] Memoizes TableSchema in BigQuerySourceBase

[BEAM-2532] Memoizes TableSchema in BigQuerySourceBase

Instead of parsing the JSON schema for every record.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e86c004d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e86c004d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e86c004d

Branch: refs/heads/DSL_SQL
Commit: e86c004de5d4b5f8bd0c3c53207cf3c1760f5d8e
Parents: d510175
Author: Neville Li <ne...@spotify.com>
Authored: Tue Jul 18 09:07:21 2017 -0400
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jul 18 22:28:57 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java  | 19 +++++++++++++++++--
 1 file changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e86c004d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 2de60a2..2b1eafe 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -29,11 +29,16 @@ import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.List;
 import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.AvroSource;
@@ -168,10 +173,12 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
 
     SerializableFunction<GenericRecord, TableRow> function =
         new SerializableFunction<GenericRecord, TableRow>() {
+          private Supplier<TableSchema> schema = Suppliers.memoize(
+              Suppliers.compose(new TableSchemaFunction(), Suppliers.ofInstance(jsonSchema)));
+
           @Override
           public TableRow apply(GenericRecord input) {
-            return BigQueryAvroUtils.convertGenericRecordToTableRow(
-                input, BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class));
+            return BigQueryAvroUtils.convertGenericRecordToTableRow(input, schema.get());
           }};
 
     List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
@@ -182,6 +189,14 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
     return ImmutableList.copyOf(avroSources);
   }
 
+  private static class TableSchemaFunction implements Serializable, Function<String, TableSchema> {
+    @Nullable
+    @Override
+    public TableSchema apply(@Nullable String input) {
+      return BigQueryHelpers.fromJsonString(input, TableSchema.class);
+    }
+  }
+
   protected static class BigQueryReader extends BoundedReader<TableRow> {
     private final BigQuerySourceBase source;
     private final BigQueryServices.BigQueryJsonReader reader;