You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/14 22:55:00 UTC

[06/10] beam git commit: Use AutoValue for BigQueryIO.Write

Use AutoValue for BigQueryIO.Write


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5c715896
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5c715896
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5c715896

Branch: refs/heads/master
Commit: 5c715896e683f7bec9f150ea17c78d1dae00ee4c
Parents: 32cba32
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 18:14:39 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:30 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 314 ++++++-------------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |   8 +-
 2 files changed, 108 insertions(+), 214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5c715896/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index e5db60e..d2f6ba6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -471,9 +471,9 @@ public class BigQueryIO {
       abstract Builder setJsonTableRef(ValueProvider<String> jsonTableRef);
       abstract Builder setQuery(ValueProvider<String> query);
       abstract Builder setValidate(boolean validate);
-      abstract Builder setFlattenResults(@Nullable Boolean flattenResults);
-      abstract Builder setUseLegacySql(@Nullable Boolean useLegacySql);
-      abstract Builder setBigQueryServices(@Nullable BigQueryServices bigQueryServices);
+      abstract Builder setFlattenResults(Boolean flattenResults);
+      abstract Builder setUseLegacySql(Boolean useLegacySql);
+      abstract Builder setBigQueryServices(BigQueryServices bigQueryServices);
 
       abstract Read build();
     }
@@ -1364,10 +1364,13 @@ public class BigQueryIO {
    *   }
    * }}</pre>
    */
-  public static class Write extends PTransform<PCollection<TableRow>, PDone> {
+  @AutoValue
+  public abstract static class Write extends PTransform<PCollection<TableRow>, PDone> {
+    @VisibleForTesting
     // Maximum number of files in a single partition.
     static final int MAX_NUM_FILES = 10000;
 
+    @VisibleForTesting
     // Maximum number of bytes in a single partition -- 11 TiB just under BQ's 12 TiB limit.
     static final long MAX_SIZE_BYTES = 11 * (1L << 40);
 
@@ -1378,28 +1381,41 @@ public class BigQueryIO {
     // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
     private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
 
-    @Nullable private final ValueProvider<String> jsonTableRef;
-
-    @Nullable private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
-
-    // Table schema. The schema is required only if the table does not exist.
-    @Nullable private final ValueProvider<String> jsonSchema;
-
-    // Options for creating the table. Valid values are CREATE_IF_NEEDED and
-    // CREATE_NEVER.
-    final CreateDisposition createDisposition;
+    @Nullable abstract ValueProvider<String> getJsonTableRef();
+    @Nullable abstract SerializableFunction<BoundedWindow, TableReference> getTableRefFunction();
+    /** Table schema. The schema is required only if the table does not exist. */
+    @Nullable abstract ValueProvider<String> getJsonSchema();
+    abstract CreateDisposition getCreateDisposition();
+    abstract WriteDisposition getWriteDisposition();
+    @Nullable abstract String getTableDescription();
+    /** An option to indicate if table validation is desired. Default is true. */
+    abstract boolean getValidate();
+    @Nullable abstract BigQueryServices getBigQueryServices();
 
-    // Options for writing to the table. Valid values are WRITE_TRUNCATE,
-    // WRITE_APPEND and WRITE_EMPTY.
-    final WriteDisposition writeDisposition;
+    abstract Builder toBuilder();
 
-    @Nullable
-    final String tableDescription;
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setJsonTableRef(ValueProvider<String> jsonTableRef);
+      abstract Builder setTableRefFunction(
+          SerializableFunction<BoundedWindow, TableReference> tableRefFunction);
+      abstract Builder setJsonSchema(ValueProvider<String> jsonSchema);
+      abstract Builder setCreateDisposition(CreateDisposition createDisposition);
+      abstract Builder setWriteDisposition(WriteDisposition writeDisposition);
+      abstract Builder setTableDescription(String tableDescription);
+      abstract Builder setValidate(boolean validate);
+      abstract Builder setBigQueryServices(BigQueryServices bigQueryServices);
 
-    // An option to indicate if table validation is desired. Default is true.
-    final boolean validate;
+      abstract Write build();
+    }
 
-    @Nullable private BigQueryServices bigQueryServices;
+    private static Builder builder() {
+      return new AutoValue_BigQueryIO_Write.Builder()
+          .setValidate(true)
+          .setBigQueryServices(new BigQueryServicesImpl())
+          .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+          .setWriteDisposition(WriteDisposition.WRITE_EMPTY);
+    }
 
     /**
      * An enumeration type for the BigQuery create disposition strings.
@@ -1474,17 +1490,22 @@ public class BigQueryIO {
      * <p>Refer to {@link #parseTableSpec(String)} for the specification format.
      */
     public static Write to(String tableSpec) {
-      return new Write().withTableSpec(tableSpec);
+      return to(StaticValueProvider.of(tableSpec));
     }
 
     /** Creates a write transformation for the given table. */
-    public static Write to(ValueProvider<String> tableSpec) {
-      return new Write().withTableSpec(tableSpec);
+    public static Write to(TableReference table) {
+      return to(StaticValueProvider.of(toTableSpec(table)));
     }
 
     /** Creates a write transformation for the given table. */
-    public static Write to(TableReference table) {
-      return new Write().withTableRef(table);
+    public static Write to(ValueProvider<String> tableSpec) {
+      return builder()
+          .setJsonTableRef(
+              NestedValueProvider.of(
+                  NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
+                  new TableRefToJson()))
+          .build();
     }
 
     /**
@@ -1499,7 +1520,7 @@ public class BigQueryIO {
      * always return the same table specification.
      */
     public static Write to(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
-      return new Write().withTableSpec(tableSpecFunction);
+      return toTableReference(new TranslateTableSpecFunction(tableSpecFunction));
     }
 
     /**
@@ -1511,7 +1532,7 @@ public class BigQueryIO {
      */
     private static Write toTableReference(
         SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
-      return new Write().withTableRef(tableRefFunction);
+      return builder().setTableRefFunction(tableRefFunction).build();
     }
 
     private static class TranslateTableSpecFunction implements
@@ -1528,109 +1549,6 @@ public class BigQueryIO {
       }
     }
 
-    private Write() {
-      this(
-          null /* name */,
-          null /* jsonTableRef */,
-          null /* tableRefFunction */,
-          null /* jsonSchema */,
-          CreateDisposition.CREATE_IF_NEEDED,
-          WriteDisposition.WRITE_EMPTY,
-          null /* tableDescription */,
-          true /* validate */,
-          null /* bigQueryServices */);
-    }
-
-    private Write(String name, @Nullable ValueProvider<String> jsonTableRef,
-        @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
-        @Nullable ValueProvider<String> jsonSchema,
-        CreateDisposition createDisposition,
-        WriteDisposition writeDisposition,
-        @Nullable String tableDescription,
-        boolean validate,
-        @Nullable BigQueryServices bigQueryServices) {
-      super(name);
-      this.jsonTableRef = jsonTableRef;
-      this.tableRefFunction = tableRefFunction;
-      this.jsonSchema = jsonSchema;
-      this.createDisposition = checkNotNull(createDisposition, "createDisposition");
-      this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
-      this.tableDescription = tableDescription;
-      this.validate = validate;
-      this.bigQueryServices = bigQueryServices;
-    }
-
-    /**
-     * Returns a copy of this write transformation, but writing to the specified table. Refer to
-     * {@link #parseTableSpec(String)} for the specification format.
-     *
-     * <p>Does not modify this object.
-     */
-    private Write withTableSpec(String tableSpec) {
-      return withTableRef(NestedValueProvider.of(
-          StaticValueProvider.of(tableSpec), new TableSpecToTableRef()));
-    }
-
-    /**
-     * Returns a copy of this write transformation, but writing to the specified table.
-     *
-     * <p>Does not modify this object.
-     */
-    public Write withTableRef(TableReference table) {
-      return withTableSpec(StaticValueProvider.of(toTableSpec(table)));
-    }
-
-    /**
-     * Returns a copy of this write transformation, but writing to the specified table. Refer to
-     * {@link #parseTableSpec(String)} for the specification format.
-     *
-     * <p>Does not modify this object.
-     */
-    public Write withTableSpec(ValueProvider<String> tableSpec) {
-      return withTableRef(NestedValueProvider.of(tableSpec, new TableSpecToTableRef()));
-    }
-
-    /**
-     * Returns a copy of this write transformation, but writing to the specified table.
-     *
-     * <p>Does not modify this object.
-     */
-    private Write withTableRef(ValueProvider<TableReference> table) {
-      return new Write(name,
-          NestedValueProvider.of(table, new TableRefToJson()),
-          tableRefFunction, jsonSchema, createDisposition,
-          writeDisposition, tableDescription, validate, bigQueryServices);
-    }
-
-    /**
-     * Returns a copy of this write transformation, but using the specified function to determine
-     * which table to write to for each window.
-     *
-     * <p>Does not modify this object.
-     *
-     * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it
-     * should always return the same table specification.
-     */
-    private Write withTableSpec(
-        SerializableFunction<BoundedWindow, String> tableSpecFunction) {
-      return toTableReference(new TranslateTableSpecFunction(tableSpecFunction));
-    }
-
-    /**
-     * Returns a copy of this write transformation, but using the specified function to determine
-     * which table to write to for each window.
-     *
-     * <p>Does not modify this object.
-     *
-     * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
-     * always return the same table reference.
-     */
-    private Write withTableRef(
-        SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
-      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-          writeDisposition, tableDescription, validate, bigQueryServices);
-    }
-
     /**
      * Returns a copy of this write transformation, but using the specified schema for rows
      * to be written.
@@ -1638,18 +1556,18 @@ public class BigQueryIO {
      * <p>Does not modify this object.
      */
     public Write withSchema(TableSchema schema) {
-      return new Write(name, jsonTableRef, tableRefFunction,
-          StaticValueProvider.of(toJsonString(schema)),
-          createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+      return toBuilder()
+          .setJsonSchema(StaticValueProvider.of(toJsonString(schema)))
+          .build();
     }
 
     /**
      * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}.
      */
     public Write withSchema(ValueProvider<TableSchema> schema) {
-      return new Write(name, jsonTableRef, tableRefFunction,
-          NestedValueProvider.of(schema, new TableSchemaToJsonSchema()),
-          createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+      return toBuilder()
+          .setJsonSchema(NestedValueProvider.of(schema, new TableSchemaToJsonSchema()))
+          .build();
     }
 
     /**
@@ -1658,8 +1576,7 @@ public class BigQueryIO {
      * <p>Does not modify this object.
      */
     public Write withCreateDisposition(CreateDisposition createDisposition) {
-      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
-          createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+      return toBuilder().setCreateDisposition(createDisposition).build();
     }
 
     /**
@@ -1668,8 +1585,7 @@ public class BigQueryIO {
      * <p>Does not modify this object.
      */
     public Write withWriteDisposition(WriteDisposition writeDisposition) {
-      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
-          createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+      return toBuilder().setWriteDisposition(writeDisposition).build();
     }
 
     /**
@@ -1677,9 +1593,8 @@ public class BigQueryIO {
      *
      * <p>Does not modify this object.
      */
-    public Write withTableDescription(@Nullable String tableDescription) {
-      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
-          createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+    public Write withTableDescription(String tableDescription) {
+      return toBuilder().setTableDescription(tableDescription).build();
     }
 
     /**
@@ -1688,14 +1603,12 @@ public class BigQueryIO {
      * <p>Does not modify this object.
      */
     public Write withoutValidation() {
-      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-          writeDisposition, tableDescription, false, bigQueryServices);
+      return toBuilder().setValidate(false).build();
     }
 
     @VisibleForTesting
     Write withTestServices(BigQueryServices testServices) {
-      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-          writeDisposition, tableDescription, validate, testServices);
+      return toBuilder().setBigQueryServices(testServices).build();
     }
 
     private static void verifyTableNotExistOrEmpty(
@@ -1724,23 +1637,25 @@ public class BigQueryIO {
 
       // Exactly one of the table and table reference can be configured.
       checkState(
-          jsonTableRef != null || tableRefFunction != null,
+          getJsonTableRef() != null || getTableRefFunction() != null,
           "must set the table reference of a BigQueryIO.Write transform");
       checkState(
-          jsonTableRef == null || tableRefFunction == null,
+          getJsonTableRef() == null || getTableRefFunction() == null,
           "Cannot set both a table reference and a table function for a BigQueryIO.Write"
               + " transform");
 
       // Require a schema if creating one or more tables.
       checkArgument(
-          createDisposition != CreateDisposition.CREATE_IF_NEEDED || jsonSchema != null,
+          getCreateDisposition() != CreateDisposition.CREATE_IF_NEEDED || getJsonSchema() != null,
           "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
 
       // The user specified a table.
-      if (jsonTableRef != null && validate) {
+      BigQueryServices bqServices =
+          MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
+      if (getJsonTableRef() != null && getValidate()) {
         TableReference table = getTableWithDefaultProject(options).get();
 
-        DatasetService datasetService = getBigQueryServices().getDatasetService(options);
+        DatasetService datasetService = bqServices.getDatasetService(options);
         // Check for destination table presence and emptiness for early failure notification.
         // Note that a presence check can fail when the table or dataset is created by an earlier
         // stage of the pipeline. For these cases the #withoutValidation method can be used to
@@ -1754,22 +1669,22 @@ public class BigQueryIO {
         }
       }
 
-      if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefFunction != null) {
+      if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || getTableRefFunction() != null) {
         // We will use BigQuery's streaming write API -- validate supported dispositions.
-        if (tableRefFunction != null) {
+        if (getTableRefFunction() != null) {
           checkArgument(
-              createDisposition != CreateDisposition.CREATE_NEVER,
+              getCreateDisposition() != CreateDisposition.CREATE_NEVER,
               "CreateDisposition.CREATE_NEVER is not supported when using a tablespec"
               + " function.");
         }
-        if (jsonSchema == null) {
+        if (getJsonSchema() == null) {
           checkArgument(
-              createDisposition == CreateDisposition.CREATE_NEVER,
+              getCreateDisposition() == CreateDisposition.CREATE_NEVER,
               "CreateDisposition.CREATE_NEVER must be used if jsonSchema is null.");
         }
 
         checkArgument(
-            writeDisposition != WriteDisposition.WRITE_TRUNCATE,
+            getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE,
             "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or"
                 + " when using a tablespec function.");
       } else {
@@ -1778,7 +1693,7 @@ public class BigQueryIO {
         checkArgument(
             !Strings.isNullOrEmpty(tempLocation),
             "BigQueryIO.Write needs a GCS temp location to store temp files.");
-        if (bigQueryServices == null) {
+        if (bqServices == null) {
           try {
             GcsPath.fromUri(tempLocation);
           } catch (IllegalArgumentException e) {
@@ -1796,17 +1711,18 @@ public class BigQueryIO {
     public PDone expand(PCollection<TableRow> input) {
       Pipeline p = input.getPipeline();
       BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
-      BigQueryServices bqServices = getBigQueryServices();
+      BigQueryServices bqServices =
+          MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
 
       // When writing an Unbounded PCollection, or when a tablespec function is defined, we use
       // StreamWithDeDup and BigQuery's streaming import API.
-      if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) {
+      if (input.isBounded() == IsBounded.UNBOUNDED || getTableRefFunction() != null) {
         return input.apply(
-            new StreamWithDeDup(getTable(), tableRefFunction,
-                jsonSchema == null ? null : NestedValueProvider.of(
-                    jsonSchema, new JsonSchemaToTableSchema()),
-                createDisposition,
-                tableDescription,
+            new StreamWithDeDup(getTable(), getTableRefFunction(),
+                getJsonSchema() == null ? null : NestedValueProvider.of(
+                    getJsonSchema(), new JsonSchemaToTableSchema()),
+                getCreateDisposition(),
+                getTableDescription(),
                 bqServices));
       }
 
@@ -1874,10 +1790,10 @@ public class BigQueryIO {
               jobIdTokenView,
               tempFilePrefix,
               NestedValueProvider.of(table, new TableRefToJson()),
-              jsonSchema,
+              getJsonSchema(),
               WriteDisposition.WRITE_EMPTY,
               CreateDisposition.CREATE_IF_NEEDED,
-              tableDescription))
+              getTableDescription()))
           .withSideInputs(jobIdTokenView));
 
       PCollectionView<Iterable<String>> tempTablesView = tempTables
@@ -1887,10 +1803,10 @@ public class BigQueryIO {
               bqServices,
               jobIdTokenView,
               NestedValueProvider.of(table, new TableRefToJson()),
-              writeDisposition,
-              createDisposition,
+              getWriteDisposition(),
+              getCreateDisposition(),
               tempTablesView,
-              tableDescription))
+              getTableDescription()))
           .withSideInputs(tempTablesView, jobIdTokenView));
 
       // Write single partition to final table
@@ -1902,10 +1818,10 @@ public class BigQueryIO {
               jobIdTokenView,
               tempFilePrefix,
               NestedValueProvider.of(table, new TableRefToJson()),
-              jsonSchema,
-              writeDisposition,
-              createDisposition,
-              tableDescription))
+              getJsonSchema(),
+              getWriteDisposition(),
+              getCreateDisposition(),
+              getTableDescription()))
           .withSideInputs(jobIdTokenView));
 
       return PDone.in(input.getPipeline());
@@ -1969,41 +1885,31 @@ public class BigQueryIO {
       super.populateDisplayData(builder);
 
       builder
-          .addIfNotNull(DisplayData.item("table", jsonTableRef)
+          .addIfNotNull(DisplayData.item("table", getJsonTableRef())
             .withLabel("Table Reference"))
-          .addIfNotNull(DisplayData.item("schema", jsonSchema)
+          .addIfNotNull(DisplayData.item("schema", getJsonSchema())
             .withLabel("Table Schema"));
 
-      if (tableRefFunction != null) {
-        builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
+      if (getTableRefFunction() != null) {
+        builder.add(DisplayData.item("tableFn", getTableRefFunction().getClass())
           .withLabel("Table Reference Function"));
       }
 
       builder
-          .add(DisplayData.item("createDisposition", createDisposition.toString())
+          .add(DisplayData.item("createDisposition", getCreateDisposition().toString())
             .withLabel("Table CreateDisposition"))
-          .add(DisplayData.item("writeDisposition", writeDisposition.toString())
+          .add(DisplayData.item("writeDisposition", getWriteDisposition().toString())
             .withLabel("Table WriteDisposition"))
-          .addIfNotDefault(DisplayData.item("validation", validate)
+          .addIfNotDefault(DisplayData.item("validation", getValidate())
             .withLabel("Validation Enabled"), true)
-          .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
+          .addIfNotNull(DisplayData.item("tableDescription", getTableDescription())
             .withLabel("Table Description"));
     }
 
-    /** Returns the create disposition. */
-    public CreateDisposition getCreateDisposition() {
-      return createDisposition;
-    }
-
-    /** Returns the write disposition. */
-    public WriteDisposition getWriteDisposition() {
-      return writeDisposition;
-    }
-
     /** Returns the table schema. */
     public TableSchema getSchema() {
       return fromJsonString(
-          jsonSchema == null ? null : jsonSchema.get(), TableSchema.class);
+          getJsonSchema() == null ? null : getJsonSchema().get(), TableSchema.class);
     }
 
     /**
@@ -2036,20 +1942,8 @@ public class BigQueryIO {
     /** Returns the table reference, or {@code null}. */
     @Nullable
     public ValueProvider<TableReference> getTable() {
-      return jsonTableRef == null ? null :
-          NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
-    }
-
-    /** Returns {@code true} if table validation is enabled. */
-    public boolean getValidate() {
-      return validate;
-    }
-
-    private BigQueryServices getBigQueryServices() {
-      if (bigQueryServices == null) {
-        bigQueryServices = new BigQueryServicesImpl();
-      }
-      return bigQueryServices;
+      return getJsonTableRef() == null ? null :
+          NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef());
     }
 
     static class TableRowWriter {

http://git-wip-us.apache.org/repos/asf/beam/blob/5c715896/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index fdaa81c..888d9c1 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -712,10 +712,10 @@ public class BigQueryIOTest implements Serializable {
     assertEquals(dataset, write.getTable().get().getDatasetId());
     assertEquals(table, write.getTable().get().getTableId());
     assertEquals(schema, write.getSchema());
-    assertEquals(createDisposition, write.createDisposition);
-    assertEquals(writeDisposition, write.writeDisposition);
-    assertEquals(tableDescription, write.tableDescription);
-    assertEquals(validate, write.validate);
+    assertEquals(createDisposition, write.getCreateDisposition());
+    assertEquals(writeDisposition, write.getWriteDisposition());
+    assertEquals(tableDescription, write.getTableDescription());
+    assertEquals(validate, write.getValidate());
   }
 
   @Before