You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/12/29 05:10:30 UTC

[1/3] beam git commit: [BEAM-1022] Add testing coverage for BigQuery streaming writes

Repository: beam
Updated Branches:
  refs/heads/release-0.4.0 482016037 -> e77080a4d


[BEAM-1022] Add testing coverage for BigQuery streaming writes


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c53a332e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c53a332e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c53a332e

Branch: refs/heads/release-0.4.0
Commit: c53a332e23bc6a6441a0d52c57c9fbe5e00171b5
Parents: 4820160
Author: Reuven Lax <re...@google.com>
Authored: Thu Nov 17 10:57:41 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Dec 28 14:22:11 2016 -0800

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  48 +-
 .../sdk/io/gcp/bigquery/BigQueryServices.java   |   7 +-
 .../io/gcp/bigquery/BigQueryServicesImpl.java   | 121 ++++-
 .../io/gcp/bigquery/BigQueryTableInserter.java  | 217 ---------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 456 +++++++++++++++----
 .../gcp/bigquery/BigQueryServicesImplTest.java  | 139 +++++-
 .../gcp/bigquery/BigQueryTableInserterTest.java | 245 ----------
 .../sdk/io/gcp/bigquery/BigQueryUtilTest.java   |  50 +-
 8 files changed, 655 insertions(+), 628 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c53a332e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 0be8567..28049ed 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -24,7 +24,6 @@ import static com.google.common.base.Preconditions.checkState;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.api.client.json.JsonFactory;
-import com.google.api.services.bigquery.Bigquery;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationExtract;
 import com.google.api.services.bigquery.model.JobConfigurationLoad;
@@ -33,6 +32,7 @@ import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.JobStatus;
+import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
@@ -1796,8 +1796,8 @@ public class BigQueryIO {
        * <p>Does not modify this object.
        */
       public Bound withCreateDisposition(CreateDisposition createDisposition) {
-        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, validate, bigQueryServices);
+        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema,
+            createDisposition, writeDisposition, validate, bigQueryServices);
       }
 
       /**
@@ -1806,8 +1806,8 @@ public class BigQueryIO {
        * <p>Does not modify this object.
        */
       public Bound withWriteDisposition(WriteDisposition writeDisposition) {
-        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, validate, bigQueryServices);
+        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema,
+            createDisposition, writeDisposition, validate, bigQueryServices);
       }
 
       /**
@@ -2136,7 +2136,8 @@ public class BigQueryIO {
       /** Returns the table reference, or {@code null}. */
       @Nullable
       public ValueProvider<TableReference> getTable() {
-        return NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
+        return jsonTableRef == null ? null :
+            NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
       }
 
       /** Returns {@code true} if table validation is enabled. */
@@ -2550,6 +2551,13 @@ public class BigQueryIO {
     }
   }
 
+  /**
+   * Clear the cached map of created tables. Used for testing.
+   */
+  @VisibleForTesting
+  static void clearCreatedTables() {
+    StreamingWriteFn.clearCreatedTables();
+  }
   /////////////////////////////////////////////////////////////////////////////
 
   /**
@@ -2585,6 +2593,15 @@ public class BigQueryIO {
       this.bqServices = checkNotNull(bqServices, "bqServices");
     }
 
+    /**
+     * Clear the cached map of created tables. Used for testing.
+     */
+    private static void clearCreatedTables() {
+      synchronized (createdTables) {
+        createdTables.clear();
+      }
+    }
+
     /** Prepares a target BigQuery table. */
     @StartBundle
     public void startBundle(Context context) {
@@ -2626,20 +2643,25 @@ public class BigQueryIO {
     }
 
     public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
-        throws IOException {
+        throws InterruptedException, IOException {
       TableReference tableReference = parseTableSpec(tableSpec);
       if (!createdTables.contains(tableSpec)) {
         synchronized (createdTables) {
           // Another thread may have succeeded in creating the table in the meanwhile, so
           // check again. This check isn't needed for correctness, but we add it to prevent
           // every thread from attempting a create and overwhelming our BigQuery quota.
+          DatasetService datasetService = bqServices.getDatasetService(options);
           if (!createdTables.contains(tableSpec)) {
-            TableSchema tableSchema = JSON_FACTORY.fromString(
-                jsonTableSchema.get(), TableSchema.class);
-            Bigquery client = Transport.newBigQueryClient(options).build();
-            BigQueryTableInserter inserter = new BigQueryTableInserter(client, options);
-            inserter.getOrCreateTable(tableReference, Write.WriteDisposition.WRITE_APPEND,
-                Write.CreateDisposition.CREATE_IF_NEEDED, tableSchema);
+            Table table = datasetService.getTable(
+                tableReference.getProjectId(),
+                tableReference.getDatasetId(),
+                tableReference.getTableId());
+            if (table == null) {
+              TableSchema tableSchema = JSON_FACTORY.fromString(
+                  jsonTableSchema.get(), TableSchema.class);
+              datasetService.createTable(
+                  new Table().setTableReference(tableReference).setSchema(tableSchema));
+            }
             createdTables.add(tableSpec);
           }
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/c53a332e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 07dc06e..8ca473d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -114,12 +114,17 @@ interface BigQueryServices extends Serializable {
    */
   interface DatasetService {
     /**
-     * Gets the specified {@link Table} resource by table ID.
+     * Gets the specified {@link Table} resource by table ID or {@code null} if no table exists.
      */
     Table getTable(String projectId, String datasetId, String tableId)
         throws InterruptedException, IOException;
 
     /**
+     * Creates the specified table if it does not exist.
+     */
+    void createTable(Table table) throws InterruptedException, IOException;
+
+    /**
      * Deletes the table specified by tableId from the dataset.
      * If the table contains data, all the data will be deleted.
      */

http://git-wip-us.apache.org/repos/asf/beam/blob/c53a332e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 61f1a1a..4eb8e7b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -23,6 +23,7 @@ import com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
 import com.google.api.client.util.BackOff;
 import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.ExponentialBackOff;
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.bigquery.Bigquery;
 import com.google.api.services.bigquery.model.Dataset;
@@ -53,10 +54,12 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.Transport;
 import org.joda.time.Duration;
@@ -281,7 +284,8 @@ class BigQueryServicesImpl implements BigQueryServices {
               "Unable to dry run query: %s, aborting after %d retries.",
               queryConfig, MAX_RPC_RETRIES),
           Sleeper.DEFAULT,
-          backoff).getStatistics();
+          backoff,
+          ALWAYS_RETRY).getStatistics();
     }
 
     /**
@@ -400,7 +404,80 @@ class BigQueryServicesImpl implements BigQueryServices {
               "Unable to get table: %s, aborting after %d retries.",
               tableId, MAX_RPC_RETRIES),
           Sleeper.DEFAULT,
-          backoff);
+          backoff,
+          DONT_RETRY_NOT_FOUND);
+    }
+
+    /**
+     * Retry table creation up to 5 minutes (with exponential backoff) when this user is near the
+     * quota for table creation. This relatively innocuous behavior can happen when BigQueryIO is
+     * configured with a table spec function to use different tables for each window.
+     */
+    private static final int RETRY_CREATE_TABLE_DURATION_MILLIS =
+        (int) TimeUnit.MINUTES.toMillis(5);
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p>If a table with the same name already exists in the dataset, the function simply
+     * returns. In such a case,
+     * the existing table doesn't necessarily have the same schema as specified
+     * by the parameter.
+     *
+     * @throws IOException if other error than already existing table occurs.
+     */
+    @Override
+    public void createTable(Table table) throws InterruptedException, IOException {
+      LOG.info("Trying to create BigQuery table: {}",
+          BigQueryIO.toTableSpec(table.getTableReference()));
+      BackOff backoff =
+              new ExponentialBackOff.Builder()
+                      .setMaxElapsedTimeMillis(RETRY_CREATE_TABLE_DURATION_MILLIS)
+                      .build();
+
+      tryCreateTable(table, backoff, Sleeper.DEFAULT);
+    }
+
+    @VisibleForTesting
+    @Nullable
+    Table tryCreateTable(Table table, BackOff backoff, Sleeper sleeper)
+            throws IOException {
+      boolean retry = false;
+      while (true) {
+        try {
+          return client.tables().insert(
+              table.getTableReference().getProjectId(),
+              table.getTableReference().getDatasetId(),
+              table).execute();
+        } catch (IOException e) {
+          ApiErrorExtractor extractor = new ApiErrorExtractor();
+          if (extractor.itemAlreadyExists(e)) {
+            // The table already exists, nothing to return.
+            return null;
+          } else if (extractor.rateLimited(e)) {
+            // The request failed because we hit a temporary quota. Back off and try again.
+            try {
+              if (BackOffUtils.next(sleeper, backoff)) {
+                if (!retry) {
+                  LOG.info(
+                      "Quota limit reached when creating table {}:{}.{}, retrying up to {} minutes",
+                      table.getTableReference().getProjectId(),
+                      table.getTableReference().getDatasetId(),
+                      table.getTableReference().getTableId(),
+                      TimeUnit.MILLISECONDS.toSeconds(RETRY_CREATE_TABLE_DURATION_MILLIS) / 60.0);
+                  retry = true;
+                }
+                continue;
+              }
+            } catch (InterruptedException e1) {
+              // Restore interrupted state and throw the last failure.
+              Thread.currentThread().interrupt();
+              throw e;
+            }
+          }
+          throw e;
+        }
+      }
     }
 
     /**
@@ -422,7 +499,8 @@ class BigQueryServicesImpl implements BigQueryServices {
               "Unable to delete table: %s, aborting after %d retries.",
               tableId, MAX_RPC_RETRIES),
           Sleeper.DEFAULT,
-          backoff);
+          backoff,
+          ALWAYS_RETRY);
     }
 
     @Override
@@ -437,7 +515,8 @@ class BigQueryServicesImpl implements BigQueryServices {
               "Unable to list table data: %s, aborting after %d retries.",
               tableId, MAX_RPC_RETRIES),
           Sleeper.DEFAULT,
-          backoff);
+          backoff,
+          ALWAYS_RETRY);
       return dataList.getRows() == null || dataList.getRows().isEmpty();
     }
 
@@ -460,7 +539,8 @@ class BigQueryServicesImpl implements BigQueryServices {
               "Unable to get dataset: %s, aborting after %d retries.",
               datasetId, MAX_RPC_RETRIES),
           Sleeper.DEFAULT,
-          backoff);
+          backoff,
+          DONT_RETRY_NOT_FOUND);
     }
 
     /**
@@ -543,7 +623,8 @@ class BigQueryServicesImpl implements BigQueryServices {
               "Unable to delete table: %s, aborting after %d retries.",
               datasetId, MAX_RPC_RETRIES),
           Sleeper.DEFAULT,
-          backoff);
+          backoff,
+          ALWAYS_RETRY);
     }
 
     @VisibleForTesting
@@ -684,8 +765,8 @@ class BigQueryServicesImpl implements BigQueryServices {
     public long insertAll(
         TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
         throws IOException, InterruptedException {
-          return insertAll(
-              ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
+      return insertAll(
+          ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
     }
   }
 
@@ -747,12 +828,31 @@ class BigQueryServicesImpl implements BigQueryServices {
     }
   }
 
+  static final SerializableFunction<IOException, Boolean> DONT_RETRY_NOT_FOUND =
+      new SerializableFunction<IOException, Boolean>() {
+        @Override
+        public Boolean apply(IOException input) {
+          ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
+          return !errorExtractor.itemNotFound(input);
+        }
+      };
+
+  static final SerializableFunction<IOException, Boolean> ALWAYS_RETRY =
+      new SerializableFunction<IOException, Boolean>() {
+        @Override
+        public Boolean apply(IOException input) {
+          return true;
+        }
+      };
+
+
   @VisibleForTesting
   static <T> T executeWithRetries(
       AbstractGoogleClientRequest<T> request,
       String errorMessage,
       Sleeper sleeper,
-      BackOff backoff)
+      BackOff backoff,
+      SerializableFunction<IOException, Boolean> shouldRetry)
       throws IOException, InterruptedException {
     Exception lastException = null;
     do {
@@ -761,6 +861,9 @@ class BigQueryServicesImpl implements BigQueryServices {
       } catch (IOException e) {
         LOG.warn("Ignore the error and retry the request.", e);
         lastException = e;
+        if (!shouldRetry.apply(e)) {
+          break;
+        }
       }
     } while (nextBackOff(sleeper, backoff));
     throw new IOException(

http://git-wip-us.apache.org/repos/asf/beam/blob/c53a332e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java
deleted file mode 100644
index a64dc9f..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.ExponentialBackOff;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableDataList;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Inserts rows into BigQuery.
- */
-class BigQueryTableInserter {
-  private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableInserter.class);
-
-  private final Bigquery client;
-
-  /**
-   * Constructs a new row inserter.
-   *
-   * @param client a BigQuery client
-   * @param options a PipelineOptions object
-   */
-  BigQueryTableInserter(Bigquery client, PipelineOptions options) {
-    this.client = client;
-  }
-
-  /**
-   * Retrieves or creates the table.
-   *
-   * <p>The table is checked to conform to insertion requirements as specified
-   * by WriteDisposition and CreateDisposition.
-   *
-   * <p>If table truncation is requested (WriteDisposition.WRITE_TRUNCATE), then
-   * this will re-create the table if necessary to ensure it is empty.
-   *
-   * <p>If an empty table is required (WriteDisposition.WRITE_EMPTY), then this
-   * will fail if the table exists and is not empty.
-   *
-   * <p>When constructing a table, a {@code TableSchema} must be available.  If a
-   * schema is provided, then it will be used.  If no schema is provided, but
-   * an existing table is being cleared (WRITE_TRUNCATE option above), then
-   * the existing schema will be re-used.  If no schema is available, then an
-   * {@code IOException} is thrown.
-   */
-  Table getOrCreateTable(
-      TableReference ref,
-      WriteDisposition writeDisposition,
-      CreateDisposition createDisposition,
-      @Nullable TableSchema schema) throws IOException {
-    // Check if table already exists.
-    Bigquery.Tables.Get get = client.tables()
-        .get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
-    Table table = null;
-    try {
-      table = get.execute();
-    } catch (IOException e) {
-      ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
-      if (!errorExtractor.itemNotFound(e)
-          || createDisposition != CreateDisposition.CREATE_IF_NEEDED) {
-        // Rethrow.
-        throw e;
-      }
-    }
-
-    // If we want an empty table, and it isn't, then delete it first.
-    if (table != null) {
-      if (writeDisposition == WriteDisposition.WRITE_APPEND) {
-        return table;
-      }
-
-      boolean empty = isEmpty(ref);
-      if (empty) {
-        if (writeDisposition == WriteDisposition.WRITE_TRUNCATE) {
-          LOG.info("Empty table found, not removing {}", BigQueryIO.toTableSpec(ref));
-        }
-        return table;
-
-      } else if (writeDisposition == WriteDisposition.WRITE_EMPTY) {
-        throw new IOException("WriteDisposition is WRITE_EMPTY, "
-            + "but table is not empty");
-      }
-
-      // Reuse the existing schema if none was provided.
-      if (schema == null) {
-        schema = table.getSchema();
-      }
-
-      // Delete table and fall through to re-creating it below.
-      LOG.info("Deleting table {}", BigQueryIO.toTableSpec(ref));
-      Bigquery.Tables.Delete delete = client.tables()
-          .delete(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
-      delete.execute();
-    }
-
-    if (schema == null) {
-      throw new IllegalArgumentException(
-          "Table schema required for new table.");
-    }
-
-    // Create the table.
-    return tryCreateTable(ref, schema);
-  }
-
-  /**
-   * Checks if a table is empty.
-   */
-  private boolean isEmpty(TableReference ref) throws IOException {
-    Bigquery.Tabledata.List list = client.tabledata()
-        .list(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
-    list.setMaxResults(1L);
-    TableDataList dataList = list.execute();
-
-    return dataList.getRows() == null || dataList.getRows().isEmpty();
-  }
-
-  /**
-   * Retry table creation up to 5 minutes (with exponential backoff) when this user is near the
-   * quota for table creation. This relatively innocuous behavior can happen when BigQueryIO is
-   * configured with a table spec function to use different tables for each window.
-   */
-  private static final int RETRY_CREATE_TABLE_DURATION_MILLIS = (int) TimeUnit.MINUTES.toMillis(5);
-
-  /**
-   * Tries to create the BigQuery table.
-   * If a table with the same name already exists in the dataset, the table
-   * creation fails, and the function returns null.  In such a case,
-   * the existing table doesn't necessarily have the same schema as specified
-   * by the parameter.
-   *
-   * @param schema Schema of the new BigQuery table.
-   * @return The newly created BigQuery table information, or null if the table
-   *     with the same name already exists.
-   * @throws IOException if other error than already existing table occurs.
-   */
-  @Nullable
-  private Table tryCreateTable(TableReference ref, TableSchema schema) throws IOException {
-    LOG.info("Trying to create BigQuery table: {}", BigQueryIO.toTableSpec(ref));
-    BackOff backoff =
-        new ExponentialBackOff.Builder()
-            .setMaxElapsedTimeMillis(RETRY_CREATE_TABLE_DURATION_MILLIS)
-            .build();
-
-    Table table = new Table().setTableReference(ref).setSchema(schema);
-    return tryCreateTable(table, ref.getProjectId(), ref.getDatasetId(), backoff, Sleeper.DEFAULT);
-  }
-
-  @VisibleForTesting
-  @Nullable
-  Table tryCreateTable(
-      Table table, String projectId, String datasetId, BackOff backoff, Sleeper sleeper)
-          throws IOException {
-    boolean retry = false;
-    while (true) {
-      try {
-        return client.tables().insert(projectId, datasetId, table).execute();
-      } catch (IOException e) {
-        ApiErrorExtractor extractor = new ApiErrorExtractor();
-        if (extractor.itemAlreadyExists(e)) {
-          // The table already exists, nothing to return.
-          return null;
-        } else if (extractor.rateLimited(e)) {
-          // The request failed because we hit a temporary quota. Back off and try again.
-          try {
-            if (BackOffUtils.next(sleeper, backoff)) {
-              if (!retry) {
-                LOG.info(
-                    "Quota limit reached when creating table {}:{}.{}, retrying up to {} minutes",
-                    projectId,
-                    datasetId,
-                    table.getTableReference().getTableId(),
-                    TimeUnit.MILLISECONDS.toSeconds(RETRY_CREATE_TABLE_DURATION_MILLIS) / 60.0);
-                retry = true;
-              }
-              continue;
-            }
-          } catch (InterruptedException e1) {
-            // Restore interrupted state and throw the last failure.
-            Thread.currentThread().interrupt();
-            throw e;
-          }
-        }
-        throw e;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c53a332e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 54ec2bb..b78316f 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -23,11 +23,13 @@ import static com.google.common.base.Preconditions.checkState;
 import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.fromJsonString;
 import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.toJsonString;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doNothing;
@@ -58,18 +60,20 @@ import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Table.Cell;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -85,6 +89,8 @@ import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -133,6 +139,9 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
@@ -146,6 +155,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.Matchers;
+import org.joda.time.Instant;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -478,39 +488,81 @@ public class BigQueryIOTest implements Serializable {
     }
   }
 
-  /** A fake dataset service that can be serialized, for use in testReadFromTable. */
-  private static class FakeDatasetService implements DatasetService, Serializable {
-    private com.google.common.collect.Table<String, String, Map<String, Table>> tables =
-        HashBasedTable.create();
-
-    public FakeDatasetService withTable(
-        String projectId, String datasetId, String tableId, Table table) throws IOException {
-      Map<String, Table> dataset = tables.get(projectId, datasetId);
-      if (dataset == null) {
-        dataset = new HashMap<>();
-        tables.put(projectId, datasetId, dataset);
-      }
-      dataset.put(tableId, table);
+  private static class TableContainer {
+    Table table;
+    List<TableRow> rows;
+    List<String> ids;
+
+    TableContainer(Table table) {
+      this.table = table;
+      this.rows = new ArrayList<>();
+      this.ids = new ArrayList<>();
+    }
+
+    TableContainer addRow(TableRow row, String id) {
+      rows.add(row);
+      ids.add(id);
       return this;
     }
 
+    Table getTable() {
+      return table;
+    }
+
+     List<TableRow> getRows() {
+      return rows;
+    }
+  }
+
+  // Table information must be static, as each ParDo will get a separate instance of
+  // FakeDatasetServices, and they must all modify the same storage.
+  private static com.google.common.collect.Table<String, String, Map<String, TableContainer>>
+      tables = HashBasedTable.create();
+
+  /** A fake dataset service that can be serialized, for use in testReadFromTable. */
+  private static class FakeDatasetService implements DatasetService, Serializable {
+
     @Override
     public Table getTable(String projectId, String datasetId, String tableId)
         throws InterruptedException, IOException {
-      Map<String, Table> dataset =
-          checkNotNull(
-              tables.get(projectId, datasetId),
-              "Tried to get a table %s:%s.%s from %s, but no such table was set",
-              projectId,
-              datasetId,
-              tableId,
-              FakeDatasetService.class.getSimpleName());
-      return checkNotNull(dataset.get(tableId),
-          "Tried to get a table %s:%s.%s from %s, but no such table was set",
-          projectId,
-          datasetId,
-          tableId,
-          FakeDatasetService.class.getSimpleName());
+      synchronized (tables) {
+        Map<String, TableContainer> dataset =
+            checkNotNull(
+                tables.get(projectId, datasetId),
+                "Tried to get a dataset %s:%s from %s, but no such dataset was set",
+                projectId,
+                datasetId,
+                tableId,
+                FakeDatasetService.class.getSimpleName());
+        TableContainer tableContainer = dataset.get(tableId);
+        return tableContainer == null ? null : tableContainer.getTable();
+      }
+    }
+
+    public List<TableRow> getAllRows(String projectId, String datasetId, String tableId)
+        throws InterruptedException, IOException {
+      synchronized (tables) {
+        return getTableContainer(projectId, datasetId, tableId).getRows();
+      }
+    }
+
+    private TableContainer getTableContainer(String projectId, String datasetId, String tableId)
+            throws InterruptedException, IOException {
+       synchronized (tables) {
+         Map<String, TableContainer> dataset =
+             checkNotNull(
+                 tables.get(projectId, datasetId),
+                 "Tried to get a dataset %s:%s from %s, but no such dataset was set",
+                 projectId,
+                 datasetId,
+                 FakeDatasetService.class.getSimpleName());
+         return checkNotNull(dataset.get(tableId),
+             "Tried to get a table %s:%s.%s from %s, but no such table was set",
+             projectId,
+             datasetId,
+             tableId,
+             FakeDatasetService.class.getSimpleName());
+       }
     }
 
     @Override
@@ -519,6 +571,26 @@ public class BigQueryIOTest implements Serializable {
       throw new UnsupportedOperationException("Unsupported");
     }
 
+
+    @Override
+    public void createTable(Table table) throws IOException {
+      TableReference tableReference = table.getTableReference();
+      synchronized (tables) {
+        Map<String, TableContainer> dataset =
+            checkNotNull(
+                tables.get(tableReference.getProjectId(), tableReference.getDatasetId()),
+                "Tried to get a dataset %s:%s from %s, but no such table was set",
+                tableReference.getProjectId(),
+                tableReference.getDatasetId(),
+                FakeDatasetService.class.getSimpleName());
+        TableContainer tableContainer = dataset.get(tableReference.getTableId());
+        if (tableContainer == null) {
+          tableContainer = new TableContainer(table);
+          dataset.put(tableReference.getTableId(), tableContainer);
+        }
+      }
+    }
+
     @Override
     public boolean isTableEmpty(String projectId, String datasetId, String tableId)
         throws IOException, InterruptedException {
@@ -536,7 +608,13 @@ public class BigQueryIOTest implements Serializable {
     public void createDataset(
         String projectId, String datasetId, String location, String description)
         throws IOException, InterruptedException {
-      throw new UnsupportedOperationException("Unsupported");
+      synchronized (tables) {
+        Map<String, TableContainer> dataset = tables.get(projectId, datasetId);
+        if (dataset == null) {
+          dataset = new HashMap<>();
+          tables.put(projectId, datasetId, dataset);
+        }
+      }
     }
 
     @Override
@@ -549,55 +627,18 @@ public class BigQueryIOTest implements Serializable {
     public long insertAll(
         TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
         throws IOException, InterruptedException {
-      throw new UnsupportedOperationException("Unsupported");
-    }
-
-    ////////////////////////////////// SERIALIZATION METHODS ////////////////////////////////////
-    private void writeObject(ObjectOutputStream out) throws IOException {
-      out.writeObject(replaceTablesWithBytes(this.tables));
-    }
-
-    private com.google.common.collect.Table<String, String, Map<String, byte[]>>
-    replaceTablesWithBytes(
-        com.google.common.collect.Table<String, String, Map<String, Table>> toCopy)
-        throws IOException {
-      com.google.common.collect.Table<String, String, Map<String, byte[]>> copy =
-          HashBasedTable.create();
-      for (Cell<String, String, Map<String, Table>> cell : toCopy.cellSet()) {
-        HashMap<String, byte[]> dataset = new HashMap<>();
-        copy.put(cell.getRowKey(), cell.getColumnKey(), dataset);
-        for (Map.Entry<String, Table> dsTables : cell.getValue().entrySet()) {
-          dataset.put(
-              dsTables.getKey(), Transport.getJsonFactory().toByteArray(dsTables.getValue()));
+      synchronized (tables) {
+        assertEquals(rowList.size(), insertIdList.size());
+
+        long dataSize = 0;
+        TableContainer tableContainer = getTableContainer(
+            ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
+        for (int i = 0; i < rowList.size(); ++i) {
+          tableContainer.addRow(rowList.get(i), insertIdList.get(i));
+          dataSize += rowList.get(i).toString().length();
         }
+        return dataSize;
       }
-      return copy;
-    }
-
-    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-      com.google.common.collect.Table<String, String, Map<String, byte[]>> tablesTable =
-          (com.google.common.collect.Table<String, String, Map<String, byte[]>>) in.readObject();
-      this.tables = replaceBytesWithTables(tablesTable);
-    }
-
-    private com.google.common.collect.Table<String, String, Map<String, Table>>
-    replaceBytesWithTables(
-        com.google.common.collect.Table<String, String, Map<String, byte[]>> tablesTable)
-        throws IOException {
-      com.google.common.collect.Table<String, String, Map<String, Table>> copy =
-          HashBasedTable.create();
-      for (Cell<String, String, Map<String, byte[]>> cell : tablesTable.cellSet()) {
-        HashMap<String, Table> dataset = new HashMap<>();
-        copy.put(cell.getRowKey(), cell.getColumnKey(), dataset);
-        for (Map.Entry<String, byte[]> dsTables : cell.getValue().entrySet()) {
-          Table table =
-              Transport.getJsonFactory()
-                  .createJsonParser(new ByteArrayInputStream(dsTables.getValue()))
-                  .parse(Table.class);
-          dataset.put(dsTables.getKey(), table);
-        }
-      }
-      return copy;
     }
   }
 
@@ -658,6 +699,8 @@ public class BigQueryIOTest implements Serializable {
   @Before
   public void setUp() throws IOException {
     MockitoAnnotations.initMocks(this);
+    tables = HashBasedTable.create();
+    BigQueryIO.clearCreatedTables();
   }
 
   @Test
@@ -716,8 +759,11 @@ public class BigQueryIOTest implements Serializable {
     bqOptions.setProject(projectId);
     bqOptions.setTempLocation("gs://testbucket/testdir");
 
-    FakeDatasetService fakeDatasetService =
-        new FakeDatasetService().withTable(projectId, datasetId, tableId, null);
+    FakeDatasetService fakeDatasetService = new FakeDatasetService();
+    fakeDatasetService.createDataset(projectId, datasetId, "", "");
+    TableReference tableReference =
+        new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId);
+    fakeDatasetService.createTable(new Table().setTableReference(tableReference));
 
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService())
@@ -765,7 +811,7 @@ public class BigQueryIOTest implements Serializable {
     p.apply("ReadMyTable",
         BigQueryIO.Read
             .from("foo.com:project:somedataset.sometable")
-        .fromQuery("query"));
+            .fromQuery("query"));
     p.run();
   }
 
@@ -829,7 +875,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   @Category(NeedsRunner.class)
-  public void testReadFromTable() throws IOException {
+  public void testReadFromTable() throws IOException, InterruptedException {
     BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
     bqOptions.setProject("defaultProject");
     bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
@@ -850,10 +896,15 @@ public class BigQueryIOTest implements Serializable {
                 ImmutableList.of(
                     new TableFieldSchema().setName("name").setType("STRING"),
                     new TableFieldSchema().setName("number").setType("INTEGER"))));
+    sometable.setTableReference(
+        new TableReference()
+            .setProjectId("non-executing-project")
+            .setDatasetId("somedataset")
+            .setTableId("sometable"));
     sometable.setNumBytes(1024L * 1024L);
-    FakeDatasetService fakeDatasetService =
-        new FakeDatasetService()
-            .withTable("non-executing-project", "somedataset", "sometable", sometable);
+    FakeDatasetService fakeDatasetService = new FakeDatasetService();
+    fakeDatasetService.createDataset("non-executing-project", "somedataset", "", "");
+    fakeDatasetService.createTable(sometable);
     SerializableFunction<Void, Schema> schemaGenerator =
         new SerializableFunction<Void, Schema>() {
           @Override
@@ -945,6 +996,216 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   @Category(NeedsRunner.class)
+  public void testStreamingWrite() throws Exception {
+    BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+    bqOptions.setProject("defaultProject");
+    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+
+    FakeDatasetService datasetService = new FakeDatasetService();
+    datasetService.createDataset("project-id", "dataset-id", "", "");
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+            .withDatasetService(datasetService);
+
+    Pipeline p = TestPipeline.create(bqOptions);
+    p.apply(Create.of(
+        new TableRow().set("name", "a").set("number", 1),
+        new TableRow().set("name", "b").set("number", 2),
+        new TableRow().set("name", "c").set("number", 3),
+        new TableRow().set("name", "d").set("number", 4))
+          .withCoder(TableRowJsonCoder.of()))
+            .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED)
+            .apply(BigQueryIO.Write.to("project-id:dataset-id.table-id")
+                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+                .withSchema(new TableSchema().setFields(
+                    ImmutableList.of(
+                        new TableFieldSchema().setName("name").setType("STRING"),
+                        new TableFieldSchema().setName("number").setType("INTEGER"))))
+                .withTestServices(fakeBqServices)
+                .withoutValidation());
+    p.run();
+
+
+    assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id"),
+        containsInAnyOrder(
+            new TableRow().set("name", "a").set("number", 1),
+            new TableRow().set("name", "b").set("number", 2),
+            new TableRow().set("name", "c").set("number", 3),
+            new TableRow().set("name", "d").set("number", 4)));
+  }
+
+  /**
+   * A generic window function that allows partitioning data into windows by a string value.
+   *
+   * <p>Logically, creates multiple global windows, and the user provides a function that
+   * decides which global window a value should go into.
+   */
+  private static class PartitionedGlobalWindows extends
+
+      NonMergingWindowFn<TableRow, PartitionedGlobalWindow> {
+    private SerializableFunction<TableRow, String> extractPartition;
+
+    public PartitionedGlobalWindows(SerializableFunction<TableRow, String> extractPartition) {
+      this.extractPartition = extractPartition;
+    }
+
+    @Override
+    public Collection<PartitionedGlobalWindow> assignWindows(AssignContext c) {
+      return Collections.singletonList(new PartitionedGlobalWindow(
+          extractPartition.apply(c.element())));
+    }
+
+    @Override
+    public boolean isCompatible(WindowFn<?, ?> o) {
+      return o instanceof PartitionedGlobalWindows;
+    }
+
+    @Override
+    public Coder<PartitionedGlobalWindow> windowCoder() {
+      return new PartitionedGlobalWindowCoder();
+    }
+
+    @Override
+    public PartitionedGlobalWindow getSideInputWindow(BoundedWindow window) {
+      throw new UnsupportedOperationException(
+          "PartitionedGlobalWindows is not allowed in side inputs");
+    }
+
+    @Override
+    public Instant getOutputTime(Instant inputTimestamp, PartitionedGlobalWindow window) {
+      return inputTimestamp;
+    }
+  }
+
+  /**
+   * Custom Window object that encodes a String value.
+   */
+  private static class PartitionedGlobalWindow extends BoundedWindow {
+    String value;
+
+    public PartitionedGlobalWindow(String value) {
+      this.value = value;
+    }
+
+    @Override
+    public Instant maxTimestamp() {
+      return GlobalWindow.INSTANCE.maxTimestamp();
+    }
+
+    // The following methods are only needed due to BEAM-1022. Once this issue is fixed, we will
+    // no longer need these.
+    @Override
+    public boolean equals(Object other) {
+      if (other instanceof PartitionedGlobalWindow) {
+        return value.equals(((PartitionedGlobalWindow) other).value);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return value.hashCode();
+    }
+  }
+
+  /**
+   * Coder for @link{PartitionedGlobalWindow}.
+   */
+  private static class PartitionedGlobalWindowCoder extends AtomicCoder<PartitionedGlobalWindow> {
+    @Override
+    public void encode(PartitionedGlobalWindow window, OutputStream outStream, Context context)
+        throws IOException, CoderException {
+      StringUtf8Coder.of().encode(window.value, outStream, context);
+    }
+
+    @Override
+    public PartitionedGlobalWindow decode(InputStream inStream, Context context)
+        throws IOException, CoderException {
+      return new PartitionedGlobalWindow(StringUtf8Coder.of().decode(inStream, context));
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testStreamingWriteWithWindowFn() throws Exception {
+    BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+    bqOptions.setProject("defaultProject");
+    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+
+    FakeDatasetService datasetService = new FakeDatasetService();
+    datasetService.createDataset("project-id", "dataset-id", "", "");
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withDatasetService(datasetService);
+
+    List<TableRow> inserts = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      inserts.add(new TableRow().set("name", "number" + i).set("number", i));
+    }
+
+    // Create a windowing strategy that puts the input into five different windows depending on
+    // record value.
+    WindowFn<TableRow, PartitionedGlobalWindow> window = new PartitionedGlobalWindows(
+        new SerializableFunction<TableRow, String>() {
+          @Override
+          public String apply(TableRow value) {
+            try {
+              int intValue = (Integer) value.get("number") % 5;
+              return Integer.toString(intValue);
+            } catch (NumberFormatException e) {
+              fail(e.toString());
+            }
+            return value.toString();
+          }
+        }
+    );
+
+    SerializableFunction<BoundedWindow, String> tableFunction =
+        new SerializableFunction<BoundedWindow, String>() {
+          @Override
+          public String apply(BoundedWindow input) {
+            return "project-id:dataset-id.table-id-" + ((PartitionedGlobalWindow) input).value;
+          }
+    };
+
+    Pipeline p = TestPipeline.create(bqOptions);
+    p.apply(Create.of(inserts).withCoder(TableRowJsonCoder.of()))
+        .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED)
+        .apply(Window.<TableRow>into(window))
+        .apply(BigQueryIO.Write
+            .to(tableFunction)
+            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+            .withSchema(new TableSchema().setFields(
+                ImmutableList.of(
+                    new TableFieldSchema().setName("name").setType("STRING"),
+                    new TableFieldSchema().setName("number").setType("INTEGER"))))
+            .withTestServices(fakeBqServices)
+            .withoutValidation());
+    p.run();
+
+
+    assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-0"),
+        containsInAnyOrder(
+            new TableRow().set("name", "number0").set("number", 0),
+            new TableRow().set("name", "number5").set("number", 5)));
+    assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-1"),
+        containsInAnyOrder(
+            new TableRow().set("name", "number1").set("number", 1),
+            new TableRow().set("name", "number6").set("number", 6)));
+    assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-2"),
+        containsInAnyOrder(
+            new TableRow().set("name", "number2").set("number", 2),
+            new TableRow().set("name", "number7").set("number", 7)));
+    assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-3"),
+        containsInAnyOrder(
+            new TableRow().set("name", "number3").set("number", 3),
+            new TableRow().set("name", "number8").set("number", 8)));
+    assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-4"),
+        containsInAnyOrder(
+            new TableRow().set("name", "number4").set("number", 4),
+            new TableRow().set("name", "number9").set("number", 9)));
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
   public void testWriteUnknown() throws Exception {
     BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
     bqOptions.setProject("defaultProject");
@@ -1031,7 +1292,8 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildWrite() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("foo.com:project:somedataset.sometable");
+    BigQueryIO.Write.Bound bound =
+            BigQueryIO.Write.to("foo.com:project:somedataset.sometable");
     checkWriteObject(
         bound, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
@@ -1980,32 +2242,42 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  public void testRuntimeOptionsNotCalledInApplyInputTable() {
+  public void testRuntimeOptionsNotCalledInApplyInputTable() throws IOException {
     RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-    bqOptions.setTempLocation("gs://testbucket/testdir");
+    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(new FakeJobService());
     Pipeline pipeline = TestPipeline.create(options);
     pipeline
-        .apply(BigQueryIO.Read.from(options.getInputTable()).withoutValidation())
-        .apply(BigQueryIO.Write
+        .apply(BigQueryIO.Read
+            .from(options.getInputTable()).withoutValidation()
+            .withTestServices(fakeBqServices))
+            .apply(BigQueryIO.Write
             .to(options.getOutputTable())
             .withSchema(NestedValueProvider.of(
                 options.getOutputSchema(), new JsonSchemaToTableSchema()))
+            .withTestServices(fakeBqServices)
             .withoutValidation());
   }
 
   @Test
-  public void testRuntimeOptionsNotCalledInApplyInputQuery() {
+  public void testRuntimeOptionsNotCalledInApplyInputQuery() throws IOException {
     RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-    bqOptions.setTempLocation("gs://testbucket/testdir");
+    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(new FakeJobService());
     Pipeline pipeline = TestPipeline.create(options);
     pipeline
-        .apply(BigQueryIO.Read.fromQuery(options.getInputQuery()).withoutValidation())
-        .apply(BigQueryIO.Write
+        .apply(BigQueryIO.Read
+            .fromQuery(options.getInputQuery()).withoutValidation()
+            .withTestServices(fakeBqServices))
+            .apply(BigQueryIO.Write
             .to(options.getOutputTable())
             .withSchema(NestedValueProvider.of(
                 options.getOutputSchema(), new JsonSchemaToTableSchema()))
+            .withTestServices(fakeBqServices)
             .withoutValidation());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c53a332e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 0e76660..10ed8bd 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -17,9 +17,11 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import static com.google.common.base.Verify.verifyNotNull;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.times;
@@ -47,9 +49,12 @@ import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
 import com.google.api.services.bigquery.model.TableDataInsertAllResponse.InsertErrors;
+import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
+import com.google.cloud.hadoop.util.RetryBoundedBackOff;
 import com.google.common.collect.ImmutableList;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -64,6 +69,7 @@ import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
 import org.apache.beam.sdk.util.Transport;
+import org.joda.time.Duration;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -325,7 +331,8 @@ public class BigQueryServicesImplTest {
         bigquery.tables().get("projectId", "datasetId", "tableId"),
         "Failed to get table.",
         Sleeper.DEFAULT,
-        BackOff.STOP_BACKOFF);
+        BackOff.STOP_BACKOFF,
+        BigQueryServicesImpl.ALWAYS_RETRY);
 
     assertEquals(testTable, table);
     verify(response, times(1)).getStatusCode();
@@ -358,6 +365,11 @@ public class BigQueryServicesImplTest {
     verify(response, times(2)).getContentType();
     expectedLogs.verifyInfo("BigQuery insertAll exceeded rate limit, retrying");
   }
+  // A BackOff that makes a total of 4 attempts
+  private static final FluentBackoff TEST_BACKOFF = FluentBackoff.DEFAULT
+      .withInitialBackoff(Duration.millis(1))
+      .withExponent(1)
+      .withMaxRetries(3);
 
   /**
    * Tests that {@link DatasetServiceImpl#insertAll} retries selected rows on failure.
@@ -371,7 +383,8 @@ public class BigQueryServicesImplTest {
     List<String> insertIds = ImmutableList.of("a", "b");
 
     final TableDataInsertAllResponse bFailed = new TableDataInsertAllResponse()
-        .setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(1L)));
+        .setInsertErrors(ImmutableList.of(
+            new InsertErrors().setIndex(1L).setErrors(ImmutableList.of(new ErrorProto()))));
 
     final TableDataInsertAllResponse allRowsSucceeded = new TableDataInsertAllResponse();
 
@@ -389,9 +402,6 @@ public class BigQueryServicesImplTest {
     expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery");
   }
 
-  // A BackOff that makes a total of 4 attempts
-  private static final FluentBackoff TEST_BACKOFF = FluentBackoff.DEFAULT.withMaxRetries(3);
-
   /**
    * Tests that {@link DatasetServiceImpl#insertAll} fails gracefully when persistent issues.
    */
@@ -490,9 +500,128 @@ public class BigQueryServicesImplTest {
     GoogleJsonError error = new GoogleJsonError();
     error.setErrors(ImmutableList.of(info));
     error.setCode(status);
+    error.setMessage(reason);
     // The actual JSON response is an error container.
     GoogleJsonErrorContainer container = new GoogleJsonErrorContainer();
     container.setError(error);
     return container;
   }
+
+  @Test
+  public void testCreateTableSucceeds() throws IOException {
+    TableReference ref =
+        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    Table testTable = new Table().setTableReference(ref);
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(200);
+    when(response.getContent()).thenReturn(toStream(testTable));
+
+    BigQueryServicesImpl.DatasetServiceImpl services =
+        new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+    Table ret =
+        services.tryCreateTable(
+            testTable,
+            new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF),
+            Sleeper.DEFAULT);
+    assertEquals(testTable, ret);
+    verify(response, times(1)).getStatusCode();
+    verify(response, times(1)).getContent();
+    verify(response, times(1)).getContentType();
+  }
+
+  /**
+   * Tests that {@link BigQueryServicesImpl} does not retry non-rate-limited attempts.
+   */
+  @Test
+  public void testCreateTableDoesNotRetry() throws IOException {
+    TableReference ref =
+        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    Table testTable = new Table().setTableReference(ref);
+    // First response is 403 not-rate-limited, second response has valid payload but should not
+    // be invoked.
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
+    when(response.getContent())
+        .thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 403)))
+        .thenReturn(toStream(testTable));
+
+    thrown.expect(GoogleJsonResponseException.class);
+    thrown.expectMessage("actually forbidden");
+
+    BigQueryServicesImpl.DatasetServiceImpl services =
+        new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+    try {
+      services.tryCreateTable(
+          testTable,
+          new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF),
+          Sleeper.DEFAULT);
+      fail();
+    } catch (IOException e) {
+      verify(response, times(1)).getStatusCode();
+      verify(response, times(1)).getContent();
+      verify(response, times(1)).getContentType();
+      throw e;
+    }
+  }
+
+  /**
+   * Tests that table creation succeeds when the table already exists.
+   */
+  @Test
+  public void testCreateTableSucceedsAlreadyExists() throws IOException {
+    TableReference ref =
+        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    TableSchema schema = new TableSchema().setFields(ImmutableList.of(
+        new TableFieldSchema().setName("column1").setType("String"),
+        new TableFieldSchema().setName("column2").setType("Integer")));
+    Table testTable = new Table().setTableReference(ref).setSchema(schema);
+
+    when(response.getStatusCode()).thenReturn(409); // 409 means already exists
+
+    BigQueryServicesImpl.DatasetServiceImpl services =
+        new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+    Table ret =
+        services.tryCreateTable(
+            testTable,
+            new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF),
+            Sleeper.DEFAULT);
+
+    assertNull(ret);
+    verify(response, times(1)).getStatusCode();
+    verify(response, times(1)).getContent();
+    verify(response, times(1)).getContentType();
+  }
+
+  /**
+   * Tests that {@link BigQueryServicesImpl} retries quota rate limited attempts.
+   */
+  @Test
+  public void testCreateTableRetry() throws IOException {
+    TableReference ref =
+        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    Table testTable = new Table().setTableReference(ref);
+
+    // First response is 403 rate limited, second response has valid payload.
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
+    when(response.getContent())
+        .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
+        .thenReturn(toStream(testTable));
+
+    BigQueryServicesImpl.DatasetServiceImpl services =
+        new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+    Table ret =
+        services.tryCreateTable(
+            testTable,
+            new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF),
+            Sleeper.DEFAULT);
+    assertEquals(testTable, ret);
+    verify(response, times(2)).getStatusCode();
+    verify(response, times(2)).getContent();
+    verify(response, times(2)).getContentType();
+    verifyNotNull(ret.getTableReference());
+    expectedLogs.verifyInfo(
+        "Quota limit reached when creating table project:dataset.table, "
+            + "retrying up to 5.0 minutes");
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c53a332e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java
deleted file mode 100644
index fb79c74..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import static com.google.common.base.Verify.verifyNotNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import com.google.api.client.googleapis.json.GoogleJsonError;
-import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
-import com.google.api.client.googleapis.json.GoogleJsonErrorContainer;
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.http.LowLevelHttpResponse;
-import com.google.api.client.json.GenericJson;
-import com.google.api.client.json.Json;
-import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.client.testing.http.MockHttpTransport;
-import com.google.api.client.testing.http.MockLowLevelHttpRequest;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.cloud.hadoop.util.RetryBoundedBackOff;
-import com.google.common.collect.ImmutableList;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
-import org.apache.beam.sdk.util.Transport;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests of {@link BigQueryTableInserter}.
- */
-@RunWith(JUnit4.class)
-public class BigQueryTableInserterTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-  @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(BigQueryTableInserter.class);
-  @Mock private LowLevelHttpResponse response;
-  private Bigquery bigquery;
-  private PipelineOptions options;
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-
-    // A mock transport that lets us mock the API responses.
-    MockHttpTransport transport =
-        new MockHttpTransport.Builder()
-            .setLowLevelHttpRequest(
-                new MockLowLevelHttpRequest() {
-                  @Override
-                  public LowLevelHttpResponse execute() throws IOException {
-                    return response;
-                  }
-                })
-            .build();
-
-    // A sample BigQuery API client that uses default JsonFactory and RetryHttpInitializer.
-    bigquery =
-        new Bigquery.Builder(
-                transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())
-            .build();
-
-    options = PipelineOptionsFactory.create();
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    // These three interactions happen for every request in the normal response parsing.
-    verify(response, atLeastOnce()).getContentEncoding();
-    verify(response, atLeastOnce()).getHeaderCount();
-    verify(response, atLeastOnce()).getReasonPhrase();
-    verifyNoMoreInteractions(response);
-  }
-
-  /** A helper to wrap a {@link GenericJson} object in a content stream. */
-  private static InputStream toStream(GenericJson content) throws IOException {
-    return new ByteArrayInputStream(JacksonFactory.getDefaultInstance().toByteArray(content));
-  }
-
-  /** A helper that generates the error JSON payload that Google APIs produce. */
-  private static GoogleJsonErrorContainer errorWithReasonAndStatus(String reason, int status) {
-    ErrorInfo info = new ErrorInfo();
-    info.setReason(reason);
-    info.setDomain("global");
-    // GoogleJsonError contains one or more ErrorInfo objects; our utiities read the first one.
-    GoogleJsonError error = new GoogleJsonError();
-    error.setErrors(ImmutableList.of(info));
-    error.setCode(status);
-    // The actual JSON response is an error container.
-    GoogleJsonErrorContainer container = new GoogleJsonErrorContainer();
-    container.setError(error);
-    return container;
-  }
-
-  /**
-   * Tests that {@link BigQueryTableInserter} succeeds on the first try.
-   */
-  @Test
-  public void testCreateTableSucceeds() throws IOException {
-    Table testTable = new Table().setDescription("a table");
-
-    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
-    when(response.getStatusCode()).thenReturn(200);
-    when(response.getContent()).thenReturn(toStream(testTable));
-
-    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
-    Table ret =
-        inserter.tryCreateTable(
-            new Table(),
-            "project",
-            "dataset",
-            new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF),
-            Sleeper.DEFAULT);
-    assertEquals(testTable, ret);
-    verify(response, times(1)).getStatusCode();
-    verify(response, times(1)).getContent();
-    verify(response, times(1)).getContentType();
-  }
-
-  /**
-   * Tests that {@link BigQueryTableInserter} succeeds when the table already exists.
-   */
-  @Test
-  public void testCreateTableSucceedsAlreadyExists() throws IOException {
-    when(response.getStatusCode()).thenReturn(409); // 409 means already exists
-
-    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
-    Table ret =
-        inserter.tryCreateTable(
-            new Table(),
-            "project",
-            "dataset",
-            new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF),
-            Sleeper.DEFAULT);
-
-    assertNull(ret);
-    verify(response, times(1)).getStatusCode();
-    verify(response, times(1)).getContent();
-    verify(response, times(1)).getContentType();
-  }
-
-  /**
-   * Tests that {@link BigQueryTableInserter} retries quota rate limited attempts.
-   */
-  @Test
-  public void testCreateTableRetry() throws IOException {
-    TableReference ref =
-        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    Table testTable = new Table().setTableReference(ref);
-
-    // First response is 403 rate limited, second response has valid payload.
-    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
-    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
-    when(response.getContent())
-        .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
-        .thenReturn(toStream(testTable));
-
-    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
-    Table ret =
-        inserter.tryCreateTable(
-            testTable,
-            "project",
-            "dataset",
-            new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF),
-            Sleeper.DEFAULT);
-    assertEquals(testTable, ret);
-    verify(response, times(2)).getStatusCode();
-    verify(response, times(2)).getContent();
-    verify(response, times(2)).getContentType();
-    verifyNotNull(ret.getTableReference());
-    expectedLogs.verifyInfo(
-        "Quota limit reached when creating table project:dataset.table, "
-            + "retrying up to 5.0 minutes");
-  }
-
-  /**
-   * Tests that {@link BigQueryTableInserter} does not retry non-rate-limited attempts.
-   */
-  @Test
-  public void testCreateTableDoesNotRetry() throws IOException {
-    Table testTable = new Table().setDescription("a table");
-
-    // First response is 403 not-rate-limited, second response has valid payload but should not
-    // be invoked.
-    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
-    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
-    when(response.getContent())
-        .thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 403)))
-        .thenReturn(toStream(testTable));
-
-    thrown.expect(GoogleJsonResponseException.class);
-    thrown.expectMessage("actually forbidden");
-
-    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
-    try {
-      inserter.tryCreateTable(
-          new Table(),
-          "project",
-          "dataset",
-          new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF),
-          Sleeper.DEFAULT);
-      fail();
-    } catch (IOException e) {
-      verify(response, times(1)).getStatusCode();
-      verify(response, times(1)).getContent();
-      verify(response, times(1)).getContentType();
-      throw e;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c53a332e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index e539b33..8130238 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -361,60 +361,18 @@ public class BigQueryUtilTest {
   }
 
   @Test
-  public void testWriteAppend() throws IOException {
-    onTableGet(basicTableSchema());
-
-    TableReference ref = BigQueryIO
-        .parseTableSpec("project:dataset.table");
-
-    BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options);
-
-    inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_APPEND,
-        BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null);
-
-    verifyTableGet();
-  }
-
-  @Test
-  public void testWriteEmpty() throws IOException {
+  public void testTableGet() throws InterruptedException, IOException {
     onTableGet(basicTableSchema());
 
     TableDataList dataList = new TableDataList().setTotalRows(0L);
     onTableList(dataList);
 
-    TableReference ref = BigQueryIO
-        .parseTableSpec("project:dataset.table");
-
-    BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options);
+    BigQueryServicesImpl.DatasetServiceImpl services =
+            new BigQueryServicesImpl.DatasetServiceImpl(mockClient, options);
 
-    inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_EMPTY,
-        BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null);
+    services.getTable("project", "dataset", "table");
 
     verifyTableGet();
-    verifyTabledataList();
-  }
-
-  @Test
-  public void testWriteEmptyFail() throws IOException {
-    thrown.expect(IOException.class);
-
-    onTableGet(basicTableSchema());
-
-    TableDataList dataList = rawDataList(rawRow("Arthur", 42));
-    onTableList(dataList);
-
-    TableReference ref = BigQueryIO
-        .parseTableSpec("project:dataset.table");
-
-    BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options);
-
-    try {
-      inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_EMPTY,
-          BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null);
-    } finally {
-      verifyTableGet();
-      verifyTabledataList();
-    }
   }
 
   @Test


[3/3] beam git commit: This closes #1710

Posted by dh...@apache.org.
This closes #1710


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e77080a4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e77080a4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e77080a4

Branch: refs/heads/release-0.4.0
Commit: e77080a4d4358d0149d3c1e554581a254ea26506
Parents: 4820160 b1459de
Author: Dan Halperin <dh...@google.com>
Authored: Wed Dec 28 21:10:23 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Dec 28 21:10:23 2016 -0800

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  71 ++-
 .../sdk/io/gcp/bigquery/BigQueryServices.java   |   7 +-
 .../io/gcp/bigquery/BigQueryServicesImpl.java   | 121 ++++-
 .../io/gcp/bigquery/BigQueryTableInserter.java  | 217 ---------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 484 +++++++++++++++----
 .../gcp/bigquery/BigQueryServicesImplTest.java  | 139 +++++-
 .../gcp/bigquery/BigQueryTableInserterTest.java | 245 ----------
 .../sdk/io/gcp/bigquery/BigQueryUtilTest.java   |  50 +-
 8 files changed, 696 insertions(+), 638 deletions(-)
----------------------------------------------------------------------



[2/3] beam git commit: BigQueryIO: fix streaming write, typo in API

Posted by dh...@apache.org.
BigQueryIO: fix streaming write, typo in API

and improve testing


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b1459dea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b1459dea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b1459dea

Branch: refs/heads/release-0.4.0
Commit: b1459deacfc7077b49803b9460c5127cd4fb74bf
Parents: c53a332
Author: Sam McVeety <sg...@google.com>
Authored: Fri Dec 16 18:10:28 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Dec 28 14:22:34 2016 -0800

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 23 +++++--
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 72 ++++++++++++--------
 2 files changed, 63 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b1459dea/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 28049ed..7bb1e51 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -370,7 +370,8 @@ public class BigQueryIO {
     }
   }
 
-  private static class TableSpecToTableRef
+  @VisibleForTesting
+  static class TableSpecToTableRef
       implements SerializableFunction<String, TableReference> {
     @Override
     public TableReference apply(String from) {
@@ -807,6 +808,7 @@ public class BigQueryIO {
       /**
        * Returns the query to be read, or {@code null} if reading from a table instead.
        */
+      @Nullable
       public String getQuery() {
         return query == null ? null : query.get();
       }
@@ -814,7 +816,8 @@ public class BigQueryIO {
       /**
        * Returns the query to be read, or {@code null} if reading from a table instead.
        */
-      public ValueProvider<String> getQueryProivder() {
+      @Nullable
+      public ValueProvider<String> getQueryProvider() {
         return query;
       }
 
@@ -2813,7 +2816,8 @@ public class BigQueryIO {
    * a randomUUID is generated only once per bucket of data. The actual unique
    * id is created by concatenating this randomUUID with a sequential number.
    */
-  private static class TagWithUniqueIdsAndTable
+  @VisibleForTesting
+  static class TagWithUniqueIdsAndTable
       extends DoFn<TableRow, KV<ShardedKey<String>, TableRowInfo>> {
     /** TableSpec to write to. */
     private final ValueProvider<String> tableSpec;
@@ -2830,8 +2834,12 @@ public class BigQueryIO {
       checkArgument(table == null ^ tableRefFunction == null,
           "Exactly one of table or tableRefFunction should be set");
       if (table != null) {
-        if (table.isAccessible() && table.get().getProjectId() == null) {
-          table.get().setProjectId(options.as(BigQueryOptions.class).getProject());
+        if (table.isAccessible() && Strings.isNullOrEmpty(table.get().getProjectId())) {
+          TableReference tableRef = table.get()
+              .setProjectId(options.as(BigQueryOptions.class).getProject());
+          table = NestedValueProvider.of(
+              StaticValueProvider.of(toJsonString(tableRef)),
+              new JsonTableRefToTableRef());
         }
         this.tableSpec = NestedValueProvider.of(table, new TableRefToTableSpec());
       } else {
@@ -2870,6 +2878,11 @@ public class BigQueryIO {
       }
     }
 
+    @VisibleForTesting
+    ValueProvider<String> getTableSpec() {
+      return tableSpec;
+    }
+
     private String tableSpecFromWindow(BigQueryOptions options, BoundedWindow window) {
       if (tableSpec != null) {
         return tableSpec.get();

http://git-wip-us.apache.org/repos/asf/beam/blob/b1459dea/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index b78316f..dc566d2 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -26,6 +26,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -2242,43 +2243,60 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  public void testRuntimeOptionsNotCalledInApplyInputTable() throws IOException {
+  public void testRuntimeOptionsNotCalledInApplyInputTable() {
     RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
-    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(new FakeJobService());
+    bqOptions.setTempLocation("gs://testbucket/testdir");
     Pipeline pipeline = TestPipeline.create(options);
-    pipeline
-        .apply(BigQueryIO.Read
-            .from(options.getInputTable()).withoutValidation()
-            .withTestServices(fakeBqServices))
-            .apply(BigQueryIO.Write
-            .to(options.getOutputTable())
-            .withSchema(NestedValueProvider.of(
-                options.getOutputSchema(), new JsonSchemaToTableSchema()))
-            .withTestServices(fakeBqServices)
-            .withoutValidation());
+    BigQueryIO.Read.Bound read = BigQueryIO.Read.from(
+        options.getInputTable()).withoutValidation();
+    pipeline.apply(read);
+    // Test that this doesn't throw.
+    DisplayData.from(read);
   }
 
   @Test
-  public void testRuntimeOptionsNotCalledInApplyInputQuery() throws IOException {
+  public void testRuntimeOptionsNotCalledInApplyInputQuery() {
     RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
-    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(new FakeJobService());
+    bqOptions.setTempLocation("gs://testbucket/testdir");
     Pipeline pipeline = TestPipeline.create(options);
+    BigQueryIO.Read.Bound read = BigQueryIO.Read.fromQuery(
+        options.getInputQuery()).withoutValidation();
+    pipeline.apply(read);
+    // Test that this doesn't throw.
+    DisplayData.from(read);
+  }
+
+  @Test
+  public void testRuntimeOptionsNotCalledInApplyOutput() {
+    RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
+    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+    bqOptions.setTempLocation("gs://testbucket/testdir");
+    Pipeline pipeline = TestPipeline.create(options);
+    BigQueryIO.Write.Bound write = BigQueryIO.Write
+        .to(options.getOutputTable())
+        .withSchema(NestedValueProvider.of(
+            options.getOutputSchema(), new JsonSchemaToTableSchema()))
+        .withoutValidation();
     pipeline
-        .apply(BigQueryIO.Read
-            .fromQuery(options.getInputQuery()).withoutValidation()
-            .withTestServices(fakeBqServices))
-            .apply(BigQueryIO.Write
-            .to(options.getOutputTable())
-            .withSchema(NestedValueProvider.of(
-                options.getOutputSchema(), new JsonSchemaToTableSchema()))
-            .withTestServices(fakeBqServices)
-            .withoutValidation());
+        .apply(Create.<TableRow>of())
+        .apply(write);
+    // Test that this doesn't throw.
+    DisplayData.from(write);
+  }
+
+  @Test
+  public void testTagWithUniqueIdsAndTableProjectNotNullWithNvp() {
+    BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class);
+    bqOptions.setProject("project");
+    BigQueryIO.TagWithUniqueIdsAndTable tag =
+        new BigQueryIO.TagWithUniqueIdsAndTable(
+            bqOptions, NestedValueProvider.of(
+                StaticValueProvider.of("data_set.table_name"),
+                new BigQueryIO.TableSpecToTableRef()), null);
+    TableReference table = BigQueryIO.parseTableSpec(tag.getTableSpec().get());
+    assertNotNull(table.getProjectId());
   }
 
   private static void testNumFiles(File tempDir, int expectedNumFiles) {