You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:14:48 UTC

[14/50] [abbrv] beam git commit: Fix tests to properly fake out BigQueryService, and add tests for dynamic-table functionality.

Fix tests to properly fake out BigQueryService, and add tests for dynamic-table functionality.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b486137d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b486137d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b486137d

Branch: refs/heads/DSL_SQL
Commit: b486137d2190db9212a92176f703e6ed7858fe59
Parents: 760a945
Author: Reuven Lax <re...@google.com>
Authored: Fri Mar 31 14:16:48 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 21:12:50 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    |   7 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  15 +-
 .../beam/sdk/io/gcp/bigquery/ShardedKey.java    |   2 +-
 .../sdk/io/gcp/bigquery/StreamingInserts.java   |   5 +-
 .../sdk/io/gcp/bigquery/StreamingWriteFn.java   |   1 -
 .../sdk/io/gcp/bigquery/TableDestination.java   |   3 +-
 .../sdk/io/gcp/bigquery/TableRowWriter.java     |   3 +-
 .../sdk/io/gcp/bigquery/TagWithUniqueIds.java   |   9 -
 .../io/gcp/bigquery/WriteBundlesToFiles.java    |  12 +-
 .../sdk/io/gcp/bigquery/WritePartition.java     |  13 +-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 613 ++++++++++---------
 .../io/gcp/bigquery/FakeBigQueryServices.java   | 114 +++-
 .../sdk/io/gcp/bigquery/FakeDatasetService.java | 138 +++--
 .../sdk/io/gcp/bigquery/FakeJobService.java     | 182 +++++-
 .../sdk/io/gcp/bigquery/TableContainer.java     |  33 +-
 15 files changed, 703 insertions(+), 447 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 5e80fae..06fdfce 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -58,9 +58,8 @@ import org.apache.beam.sdk.values.TupleTagList;
 /**
  * PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery.
  */
-class BatchLoads<T> extends
-    PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> {
-  BigQueryIO.Write<T> write;
+class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> {
+  BigQueryIO.Write<?> write;
 
   private static class ConstantSchemaFunction implements
       SerializableFunction<TableDestination, TableSchema> {
@@ -79,7 +78,7 @@ class BatchLoads<T> extends
     }
   }
 
-  BatchLoads(BigQueryIO.Write<T> write) {
+  BatchLoads(BigQueryIO.Write<?> write) {
     this.write = write;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index f1baaf7..54a25c7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -64,7 +64,6 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
 import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.Transport;
@@ -536,7 +535,7 @@ public class BigQueryIO {
                 }
               }
               if (extractFiles != null && !extractFiles.isEmpty()) {
-                new GcsUtilFactory().create(options).remove(extractFiles);
+                IOChannelUtils.getFactory(extractFiles.iterator().next()).remove(extractFiles);
               }
             }
           };
@@ -701,8 +700,8 @@ public class BigQueryIO {
     @AutoValue.Builder
     abstract static class Builder<T> {
       abstract Builder<T> setJsonTableRef(ValueProvider<String> jsonTableRef);
-      abstract Builder<T> setTableRefFunction(
-          SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction);
+      abstract Builder<T> setTableFunction(
+          SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction);
       abstract Builder<T> setFormatFunction(
           SerializableFunction<T, TableRow> formatFunction);
       abstract Builder<T> setJsonSchema(ValueProvider<String> jsonSchema);
@@ -823,8 +822,7 @@ public class BigQueryIO {
      * {@link ValueInSingleWindow}, so can be determined by the value or by the window.
      */
     public Write<T> to(
-        SerializableFunction<ValueInSingleWindow<T>, String> tableSpecFunction) {
-      return toTableReference(new TranslateTableSpecFunction<T>(tableSpecFunction));
+        SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction) {
       ensureToNotCalledYet();
       return toBuilder().setTableFunction(tableFunction).build();
     }
@@ -834,7 +832,7 @@ public class BigQueryIO {
      * {@link TableReference} instead of a string table specification.
      */
     private Write<T> toTableReference(
-        SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction) {
+        SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction) {
       ensureToNotCalledYet();
       return toBuilder().setTableFunction(tableFunction).build();
     }
@@ -984,8 +982,7 @@ public class BigQueryIO {
       if (input.isBounded() == IsBounded.UNBOUNDED) {
         return rowsWithDestination.apply(new StreamingInserts(this));
       } else {
-
-        return rowsWithDestination.apply(new BatchLoads<T>(this));
+        return rowsWithDestination.apply(new BatchLoads(this));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
index ab57446..09b4fbf 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
@@ -56,7 +56,7 @@ class ShardedKey<K> implements Serializable {
       return false;
     }
     ShardedKey<K> other = (ShardedKey<K>) o;
-    return (key == other.key) && (shardNumber == other.shardNumber);
+    return Objects.equals(key, other.key) && Objects.equals(shardNumber, other.shardNumber);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
index 37afbdf..ced1d66 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
@@ -38,9 +38,8 @@ import org.apache.beam.sdk.values.PCollection;
 * PTransform that performs streaming BigQuery write. To increase consistency,
 * it leverages BigQuery best effort de-dup mechanism.
  */
-
-class StreamingInserts
-    extends PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> {
+class StreamingInserts extends PTransform<PCollection<KV<TableDestination, TableRow>>,
+    WriteResult> {
   private final Write<?> write;
 
   private static class ConstantSchemaFunction implements

http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
index 83ed3d2..22b2078 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
@@ -98,7 +98,6 @@ class StreamingWriteFn
   private void flushRows(TableReference tableReference,
       List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions options)
           throws InterruptedException {
-    System.out.println("FlUSHING ROWS " + tableRows.size());
     if (!tableRows.isEmpty()) {
       try {
         long totalBytes = bqServices.getDatasetService(options).insertAll(

http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
index e8538e0..36e1401 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
@@ -64,7 +64,8 @@ public class TableDestination implements Serializable {
       return false;
     }
     TableDestination other = (TableDestination) o;
-    return (tableSpec == other.tableSpec) && (tableDescription == other.tableDescription);
+    return Objects.equals(this.tableSpec, other.tableSpec)
+        && Objects.equals(this.tableDescription, other.tableDescription);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
index a1f6153..ee8f466 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
@@ -29,7 +29,6 @@ import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.TableRowJsonCoder;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
-import org.apache.beam.sdk.values.KV;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +56,7 @@ class TableRowWriter {
     }
   }
   TableRowWriter(String basename) {
-    this.tempFilePrefix = basename;
+      this.tempFilePrefix = basename;
   }
 
   public final void open(String uId) throws Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
index 6f0186e..7379784 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
@@ -18,23 +18,14 @@
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
 import java.io.IOException;
 import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToTableSpec;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
-import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.ValueInSingleWindow;
 
 /**
  * Fn that tags each table row with a unique id and destination table.

http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index b8069f6..869e68a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -19,19 +19,16 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.TableRow;
-
+import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Map;
 import java.util.UUID;
-
-import com.google.common.collect.Maps;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.TableRowJsonCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -50,6 +47,10 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund
   private transient Map<TableDestination, TableRowWriter> writers;
   private final String tempFilePrefix;
 
+  /**
+   * The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output file,
+   * and encapsulates the table it is destined to as well as the file byte size.
+   */
   public static class Result implements Serializable {
     public String filename;
     public Long fileByteSize;
@@ -62,6 +63,9 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund
     }
   }
 
+  /**
+   * a coder for the {@link Result} class.
+   */
   public static class ResultCoder extends AtomicCoder<Result> {
     private static final ResultCoder INSTANCE = new ResultCoder();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
index c48955b..9c48b82 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
@@ -89,8 +89,8 @@ class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>, List<
         partitions.add(Lists.<String>newArrayList());
         currResultsMap.put(tableDestination, partitions);
       }
-      int currNumFiles = currNumFilesMap.getOrDefault(tableDestination, 0);
-      long currSizeBytes = currSizeBytesMap.getOrDefault(tableDestination, 0L);
+      int currNumFiles = getOrDefault(currNumFilesMap, tableDestination, 0);
+      long currSizeBytes = getOrDefault(currSizeBytesMap, tableDestination, 0L);
       if (currNumFiles + 1 > Write.MAX_NUM_FILES
           || currSizeBytes + fileResult.fileByteSize > Write.MAX_SIZE_BYTES) {
         // Add a new partition for this table.
@@ -117,4 +117,13 @@ class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>, List<
       }
     }
   }
+
+  private <T> T getOrDefault(Map<TableDestination, T> map, TableDestination tableDestination,
+                     T defaultValue) {
+    if (map.containsKey(tableDestination)) {
+      return map.get(tableDestination);
+    } else {
+      return defaultValue;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index d1ef8e2..f10be13 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -18,6 +18,8 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -26,17 +28,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.when;
-
-import com.google.api.client.json.GenericJson;
+
 import com.google.api.client.util.Data;
 import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.JobStatistics2;
 import com.google.api.services.bigquery.model.JobStatistics4;
@@ -48,7 +42,7 @@ import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -58,9 +52,12 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.math.BigDecimal;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -69,14 +66,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -88,7 +81,6 @@ import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperation;
 import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
 import org.apache.beam.sdk.options.BigQueryOptions;
@@ -122,7 +114,6 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.PCollectionViews;
@@ -140,6 +131,7 @@ import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
@@ -147,10 +139,6 @@ import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
 
 /**
  * Tests for BigQueryIO.
@@ -158,6 +146,8 @@ import org.mockito.MockitoAnnotations;
 @RunWith(JUnit4.class)
 public class BigQueryIOTest implements Serializable {
 
+  private static Path tempFolder;
+
   // Table information must be static, as each ParDo will get a separate instance of
   // FakeDatasetServices, and they must all modify the same storage.
   static com.google.common.collect.Table<String, String, Map<String, TableContainer>>
@@ -169,8 +159,6 @@ public class BigQueryIOTest implements Serializable {
   @Rule public transient ExpectedLogs loggedWriteRename = ExpectedLogs.none(WriteRename.class);
   @Rule public transient ExpectedLogs loggedWriteTables = ExpectedLogs.none(WriteTables.class);
   @Rule public transient TemporaryFolder testFolder = new TemporaryFolder();
-  @Mock private transient IOChannelFactory mockIOChannelFactory;
-  @Mock(extraInterfaces = Serializable.class) private transient DatasetService mockDatasetService;
 
   private void checkReadTableObject(
       BigQueryIO.Read read, String project, String dataset, String table) {
@@ -227,9 +215,13 @@ public class BigQueryIOTest implements Serializable {
     assertEquals(validate, write.getValidate());
   }
 
+  @BeforeClass
+  public static void setupClass() throws IOException {
+    tempFolder = Files.createTempDirectory("BigQueryIOTest");
+  }
+
   @Before
   public void setUp() throws IOException {
-    MockitoAnnotations.initMocks(this);
     tables = HashBasedTable.create();
     BigQueryIO.clearCreatedTables();
   }
@@ -289,29 +281,53 @@ public class BigQueryIOTest implements Serializable {
     String tableId = "sometable";
     BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
     bqOptions.setProject(projectId);
-    bqOptions.setTempLocation("gs://testbucket/testdir");
+
+    Path baseDir = Files.createTempDirectory(tempFolder, "testValidateReadSetsDefaultProject");
+    bqOptions.setTempLocation(baseDir.toString());
 
     FakeDatasetService fakeDatasetService = new FakeDatasetService();
     fakeDatasetService.createDataset(projectId, datasetId, "", "");
     TableReference tableReference =
         new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId);
-    fakeDatasetService.createTable(new Table().setTableReference(tableReference));
+    fakeDatasetService.createTable(new Table()
+        .setTableReference(tableReference)
+        .setSchema(new TableSchema()
+            .setFields(
+                ImmutableList.of(
+                    new TableFieldSchema().setName("name").setType("STRING"),
+                    new TableFieldSchema().setName("number").setType("INTEGER")))));
 
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService())
         .withDatasetService(fakeDatasetService);
 
+    List<TableRow> expected = ImmutableList.of(
+        new TableRow().set("name", "a").set("number", 1L),
+        new TableRow().set("name", "b").set("number", 2L),
+        new TableRow().set("name", "c").set("number", 3L),
+        new TableRow().set("name", "d").set("number", 4L),
+        new TableRow().set("name", "e").set("number", 5L),
+        new TableRow().set("name", "f").set("number", 6L));
+    fakeDatasetService.insertAll(tableReference, expected, null);
+
     Pipeline p = TestPipeline.create(bqOptions);
 
     TableReference tableRef = new TableReference();
     tableRef.setDatasetId(datasetId);
     tableRef.setTableId(tableId);
 
-    thrown.expect(RuntimeException.class);
-    // Message will be one of following depending on the execution environment.
-    thrown.expectMessage(Matchers.containsString("Unsupported"));
-    p.apply(BigQueryIO.read().from(tableRef)
-        .withTestServices(fakeBqServices));
+    PCollection<KV<String, Long>> output =
+        p.apply(BigQueryIO.read().from(tableRef).withTestServices(fakeBqServices))
+            .apply(ParDo.of(new DoFn<TableRow, KV<String, Long>>() {
+              @ProcessElement
+              public void processElement(ProcessContext c) throws Exception {
+                c.output(KV.of((String) c.element().get("name"),
+                    Long.valueOf((String) c.element().get("number"))));
+              }
+            }));
+    PAssert.that(output).containsInAnyOrder(ImmutableList.of(KV.of("a", 1L), KV.of("b", 2L),
+        KV.of("c", 3L), KV.of("d", 4L), KV.of("e", 5L), KV.of("f", 6L)));
+     p.run();
   }
 
   @Test
@@ -400,54 +416,32 @@ public class BigQueryIOTest implements Serializable {
     FakeDatasetService fakeDatasetService = new FakeDatasetService();
     fakeDatasetService.createDataset("non-executing-project", "somedataset", "", "");
     fakeDatasetService.createTable(sometable);
-    SerializableFunction<Void, Schema> schemaGenerator =
-        new SerializableFunction<Void, Schema>() {
-          @Override
-          public Schema apply(Void input) {
-            return BigQueryAvroUtils.toGenericAvroSchema(
-                "sometable",
-                ImmutableList.of(
-                    new TableFieldSchema().setName("name").setType("STRING"),
-                    new TableFieldSchema().setName("number").setType("INTEGER")));
-          }
-        };
-    Collection<Map<String, Object>> records =
-        ImmutableList.<Map<String, Object>>builder()
-            .add(ImmutableMap.<String, Object>builder().put("name", "a").put("number", 1L).build())
-            .add(ImmutableMap.<String, Object>builder().put("name", "b").put("number", 2L).build())
-            .add(ImmutableMap.<String, Object>builder().put("name", "c").put("number", 3L).build())
-            .build();
 
-    SerializableFunction<GenericJson, Void> onStartJob =
-        new WriteExtractFiles(schemaGenerator, records);
+    List<TableRow> records = Lists.newArrayList(
+        new TableRow().set("name", "a").set("number", 1L),
+        new TableRow().set("name", "b").set("number", 2L),
+        new TableRow().set("name", "c").set("number", 3L));
+    fakeDatasetService.insertAll(sometable.getTableReference(), records, null);
 
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService())
-           // .startJobReturns(onStartJob, "done")
-          //  .pollJobReturns(job)
-         //   .getJobReturns((Job) null)
-          //  .verifyExecutingProject(bqOptions.getProject()))
-        .withDatasetService(fakeDatasetService)
-        .readerReturns(
-            toJsonString(new TableRow().set("name", "a").set("number", 1)),
-            toJsonString(new TableRow().set("name", "b").set("number", 2)),
-            toJsonString(new TableRow().set("name", "c").set("number", 3)));
+        .withDatasetService(fakeDatasetService);
 
     Pipeline p = TestPipeline.create(bqOptions);
-    PCollection<String> output = p
+    PCollection<KV<String, Long>> output = p
         .apply(BigQueryIO.read().from("non-executing-project:somedataset.sometable")
             .withTestServices(fakeBqServices)
             .withoutValidation())
-        .apply(ParDo.of(new DoFn<TableRow, String>() {
+        .apply(ParDo.of(new DoFn<TableRow, KV<String, Long>>() {
           @ProcessElement
           public void processElement(ProcessContext c) throws Exception {
-            c.output((String) c.element().get("name"));
+            c.output(KV.of((String) c.element().get("name"),
+                Long.valueOf((String) c.element().get("number"))));
           }
         }));
 
     PAssert.that(output)
-        .containsInAnyOrder(ImmutableList.of("a", "b", "c"));
-
+        .containsInAnyOrder(ImmutableList.of(KV.of("a", 1L), KV.of("b", 2L), KV.of("c", 3L)));
     p.run();
   }
 
@@ -457,13 +451,12 @@ public class BigQueryIOTest implements Serializable {
     bqOptions.setProject("defaultproject");
     bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
 
+    FakeDatasetService datasetService = new FakeDatasetService();
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService())
-        //    .startJobReturns("done", "done", "done")
-        //    .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED))
-        .withDatasetService(mockDatasetService);
+        .withDatasetService(datasetService);
 
-    mockDatasetService.createDataset("defaultproject", "dataset-id", "", "");
+    datasetService.createDataset("defaultproject", "dataset-id", "", "");
 
     Pipeline p = TestPipeline.create(bqOptions);
     p.apply(Create.of(
@@ -715,11 +708,11 @@ public class BigQueryIOTest implements Serializable {
     bqOptions.setProject("defaultproject");
     bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
 
+    FakeDatasetService datasetService = new FakeDatasetService();
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(new FakeJobService());
-       //     .startJobReturns("done", "done")
-        //    .pollJobReturns(Status.FAILED, Status.UNKNOWN));
-
+        .withJobService(new FakeJobService())
+        .withDatasetService(datasetService);
+    datasetService.createDataset("project-id", "dataset-id", "", "");
     Pipeline p = TestPipeline.create(bqOptions);
     p.apply(Create.of(
         new TableRow().set("name", "a").set("number", 1),
@@ -732,7 +725,7 @@ public class BigQueryIOTest implements Serializable {
         .withoutValidation());
 
     thrown.expect(RuntimeException.class);
-    thrown.expectMessage("UNKNOWN status of load job");
+    thrown.expectMessage("Failed to create load job");
     try {
       p.run();
     } finally {
@@ -747,10 +740,10 @@ public class BigQueryIOTest implements Serializable {
     bqOptions.setProject("defaultproject");
     bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
 
+    FakeDatasetService datasetService = new FakeDatasetService();
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(new FakeJobService());
-         //   .startJobReturns("done", "done", "done")
-         //   .pollJobReturns(Status.FAILED, Status.FAILED, Status.FAILED));
+        .withJobService(new FakeJobService())
+        .withDatasetService(datasetService);
 
     Pipeline p = TestPipeline.create(bqOptions);
     p.apply(Create.of(
@@ -817,7 +810,7 @@ public class BigQueryIOTest implements Serializable {
     BigQueryIO.Read read = BigQueryIO.read()
         .from("project:dataset.tableId")
         .withTestServices(new FakeBigQueryServices()
-            .withDatasetService(mockDatasetService)
+            .withDatasetService(new FakeDatasetService())
             .withJobService(new FakeJobService()))
         .withoutValidation();
 
@@ -833,7 +826,7 @@ public class BigQueryIOTest implements Serializable {
     BigQueryIO.Read read = BigQueryIO.read()
         .fromQuery("foobar")
         .withTestServices(new FakeBigQueryServices()
-            .withDatasetService(mockDatasetService)
+            .withDatasetService(new FakeDatasetService())
             .withJobService(new FakeJobService()))
         .withoutValidation();
 
@@ -874,7 +867,7 @@ public class BigQueryIOTest implements Serializable {
         .to("project:dataset.table")
         .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2"))
         .withTestServices(new FakeBigQueryServices()
-          .withDatasetService(mockDatasetService)
+          .withDatasetService(new FakeDatasetService())
           .withJobService(new FakeJobService()))
         .withoutValidation();
 
@@ -1040,9 +1033,7 @@ public class BigQueryIOTest implements Serializable {
 
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService())
-        .withDatasetService(mockDatasetService);
-    when(mockDatasetService.getDataset(projectId, datasetId)).thenThrow(
-        new RuntimeException("Unable to confirm BigQuery dataset presence"));
+        .withDatasetService(new FakeDatasetService());
 
     Pipeline p = TestPipeline.create(options);
 
@@ -1206,26 +1197,31 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBigQueryTableSourceThroughJsonAPI() throws Exception {
+    FakeDatasetService datasetService = new FakeDatasetService();
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService())
-        .readerReturns(
-            toJsonString(new TableRow().set("name", "a").set("number", "1")),
-            toJsonString(new TableRow().set("name", "b").set("number", "2")),
-            toJsonString(new TableRow().set("name", "c").set("number", "3")));
+        .withDatasetService(datasetService);
 
+    List<TableRow> expected = ImmutableList.of(
+        new TableRow().set("name", "a").set("number", "1"),
+        new TableRow().set("name", "b").set("number", "2"),
+        new TableRow().set("name", "c").set("number", "3"),
+        new TableRow().set("name", "d").set("number", "4"),
+        new TableRow().set("name", "e").set("number", "5"),
+        new TableRow().set("name", "f").set("number", "6"));
+
+    TableReference table = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
+    datasetService.createDataset(table.getProjectId(), table.getDatasetId(), "", "");
+    datasetService.createTable(new Table().setTableReference(table));
+    datasetService.insertAll(table, expected, null);
+
+    Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryTableSourceThroughJsonAPI");
     String jobIdToken = "testJobIdToken";
-    TableReference table = BigQueryHelpers.parseTableSpec("project.data_set.table_name");
-    String extractDestinationDir = "mock://tempLocation";
     BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
         StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table),
-        extractDestinationDir, fakeBqServices,
+        baseDir.toString(), fakeBqServices,
         StaticValueProvider.of("project"));
 
-    List<TableRow> expected = ImmutableList.of(
-        new TableRow().set("name", "a").set("number", "1"),
-        new TableRow().set("name", "b").set("number", "2"),
-        new TableRow().set("name", "c").set("number", "3"));
-
     PipelineOptions options = PipelineOptionsFactory.create();
     Assert.assertThat(
         SourceTestUtils.readFromSource(bqSource, options),
@@ -1244,43 +1240,48 @@ public class BigQueryIOTest implements Serializable {
     extractJob.setStatus(new JobStatus())
         .setStatistics(jobStats);
 
+    FakeDatasetService fakeDatasetService = new FakeDatasetService();
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService())
-        .withDatasetService(mockDatasetService)
-        .readerReturns(
-            toJsonString(new TableRow().set("name", "a").set("number", "1")),
-            toJsonString(new TableRow().set("name", "b").set("number", "2")),
-            toJsonString(new TableRow().set("name", "c").set("number", "3")));
+        .withDatasetService(fakeDatasetService);
+
+    List<TableRow> expected = ImmutableList.of(
+        new TableRow().set("name", "a").set("number", 1L),
+        new TableRow().set("name", "b").set("number", 2L),
+        new TableRow().set("name", "c").set("number", 3L),
+        new TableRow().set("name", "d").set("number", 4L),
+        new TableRow().set("name", "e").set("number", 5L),
+        new TableRow().set("name", "f").set("number", 6L));
 
-    String jobIdToken = "testJobIdToken";
     TableReference table = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
-    String extractDestinationDir = "mock://tempLocation";
+    fakeDatasetService.createDataset("project", "data_set", "", "");
+    fakeDatasetService.createTable(new Table().setTableReference(table)
+        .setSchema(new TableSchema()
+            .setFields(
+                ImmutableList.of(
+                    new TableFieldSchema().setName("name").setType("STRING"),
+                    new TableFieldSchema().setName("number").setType("INTEGER")))));
+    fakeDatasetService.insertAll(table, expected, null);
+
+    Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryTableSourceInitSplit");
+
+    String jobIdToken = "testJobIdToken";
+    String extractDestinationDir = baseDir.toString();
     BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
         StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table),
         extractDestinationDir, fakeBqServices, StaticValueProvider.of("project"));
 
-    List<TableRow> expected = ImmutableList.of(
-        new TableRow().set("name", "a").set("number", "1"),
-        new TableRow().set("name", "b").set("number", "2"),
-        new TableRow().set("name", "c").set("number", "3"));
 
     PipelineOptions options = PipelineOptionsFactory.create();
-    options.setTempLocation("mock://tempLocation");
-
-    IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */);
-    when(mockIOChannelFactory.resolve(anyString(), anyString()))
-        .thenReturn("mock://tempLocation/output");
-    when(mockDatasetService.getTable(any(TableReference.class)))
-        .thenReturn(new Table().setSchema(new TableSchema()));
+    options.setTempLocation(baseDir.toString());
 
-    Assert.assertThat(
-        SourceTestUtils.readFromSource(bqSource, options),
-        CoreMatchers.is(expected));
+    List<TableRow> read = SourceTestUtils.readFromSource(bqSource, options);
+    assertThat(read, containsInAnyOrder(Iterables.toArray(expected, TableRow.class)));
     SourceTestUtils.assertSplitAtFractionBehavior(
         bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
 
     List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
-    assertEquals(1, sources.size());
+    assertEquals(2, sources.size());
     BoundedSource<TableRow> actual = sources.get(0);
     assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
   }
@@ -1306,80 +1307,63 @@ public class BigQueryIOTest implements Serializable {
         .setStatistics(extractJobStats);
 
     FakeJobService fakeJobService = new FakeJobService();
+    FakeDatasetService fakeDatasetService = new FakeDatasetService();
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(fakeJobService)
-        .withDatasetService(mockDatasetService)
-        .readerReturns(
-            toJsonString(new TableRow().set("name", "a").set("number", "1")),
-            toJsonString(new TableRow().set("name", "b").set("number", "2")),
-            toJsonString(new TableRow().set("name", "c").set("number", "3")));
+        .withDatasetService(fakeDatasetService);
+
+    List<TableRow> expected = ImmutableList.of(
+        new TableRow().set("name", "a").set("number", 1L),
+        new TableRow().set("name", "b").set("number", 2L),
+        new TableRow().set("name", "c").set("number", 3L),
+        new TableRow().set("name", "d").set("number", 4L),
+        new TableRow().set("name", "e").set("number", 5L),
+        new TableRow().set("name", "f").set("number", 6L));
 
-    String jobIdToken = "testJobIdToken";
-    String extractDestinationDir = "mock://tempLocation";
     TableReference destinationTable = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
+    fakeDatasetService.createDataset("project", "data_set", "", "");
+    fakeDatasetService.createTable(new Table()
+        .setTableReference(destinationTable)
+        .setSchema(new TableSchema()
+            .setFields(
+                ImmutableList.of(
+                    new TableFieldSchema().setName("name").setType("STRING"),
+                    new TableFieldSchema().setName("number").setType("INTEGER")))));
+    Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryQuerySourceInitSplit");
+
+    String jobIdToken = "testJobIdToken";
+    String query = FakeBigQueryServices.encodeQuery(expected);
+    String extractDestinationDir = baseDir.toString();
     BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
-        StaticValueProvider.of(jobIdToken), StaticValueProvider.of("query"),
+        StaticValueProvider.of(jobIdToken), StaticValueProvider.of(query),
         StaticValueProvider.of(destinationTable),
         true /* flattenResults */, true /* useLegacySql */,
         extractDestinationDir, fakeBqServices);
 
-    List<TableRow> expected = ImmutableList.of(
-        new TableRow().set("name", "a").set("number", "1"),
-        new TableRow().set("name", "b").set("number", "2"),
-        new TableRow().set("name", "c").set("number", "3"));
-
     PipelineOptions options = PipelineOptionsFactory.create();
     options.setTempLocation(extractDestinationDir);
 
     TableReference queryTable = new TableReference()
-        .setProjectId("testproject")
-        .setDatasetId("testDataset")
-        .setTableId("testTable");
-  //  when(mockJobService.dryRunQuery(anyString(), Mockito.<JobConfigurationQuery>any()))
-     //   .thenReturn(new JobStatistics().setQuery(
-     //       new JobStatistics2()
-     //           .setTotalBytesProcessed(100L)
-     //           .setReferencedTables(ImmutableList.of(queryTable))));
-    fakeJobService.expectDryRunQuery("testproject", "query",
+        .setProjectId("project")
+        .setDatasetId("data_set")
+        .setTableId("table_name");
+
+    fakeJobService.expectDryRunQuery("project", query,
         new JobStatistics().setQuery(
             new JobStatistics2()
                 .setTotalBytesProcessed(100L)
                 .setReferencedTables(ImmutableList.of(queryTable))));
 
-   // when(mockDatasetService.getTable(eq(queryTable)))
-     //   .thenReturn(new Table().setSchema(new TableSchema()));
-   // when(mockDatasetService.getTable(eq(destinationTable)))
-    //    .thenReturn(new Table().setSchema(new TableSchema()));
-    IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */);
-    when(mockIOChannelFactory.resolve(anyString(), anyString()))
-        .thenReturn("mock://tempLocation/output");
-    //when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))
-    //    .thenReturn(extractJob);
-
-    Assert.assertThat(
-        SourceTestUtils.readFromSource(bqSource, options),
-        CoreMatchers.is(expected));
+    List<TableRow> read = SourceTestUtils.readFromSource(bqSource, options);
+    assertThat(read, containsInAnyOrder(Iterables.toArray(expected, TableRow.class)));
     SourceTestUtils.assertSplitAtFractionBehavior(
         bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
 
+
     List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
-    assertEquals(1, sources.size());
+    assertEquals(2, sources.size());
     BoundedSource<TableRow> actual = sources.get(0);
     assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
-
-    /*
-    Mockito.verify(mockJobService)
-        .startQueryJob(
-            Mockito.<JobReference>any(), Mockito.<JobConfigurationQuery>any());
-    Mockito.verify(mockJobService)
-        .startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any());
-    Mockito.verify(mockDatasetService)
-        .createDataset(anyString(), anyString(), anyString(), anyString());
-    ArgumentCaptor<JobConfigurationQuery> queryConfigArg =
-        ArgumentCaptor.forClass(JobConfigurationQuery.class);
-    Mockito.verify(mockJobService).dryRunQuery(anyString(), queryConfigArg.capture());
-    assertEquals(true, queryConfigArg.getValue().getFlattenResults());
-    assertEquals(true, queryConfigArg.getValue().getUseLegacySql());*/
   }
 
   @Test
@@ -1402,68 +1386,60 @@ public class BigQueryIOTest implements Serializable {
     extractJob.setStatus(new JobStatus())
         .setStatistics(extractJobStats);
 
+    FakeDatasetService datasetService = new FakeDatasetService();
+    FakeJobService jobService = new FakeJobService();
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(new FakeJobService())
-        .withDatasetService(mockDatasetService)
-        .readerReturns(
-            toJsonString(new TableRow().set("name", "a").set("number", "1")),
-            toJsonString(new TableRow().set("name", "b").set("number", "2")),
-            toJsonString(new TableRow().set("name", "c").set("number", "3")));
+        .withJobService(jobService)
+        .withDatasetService(datasetService);
 
-    String jobIdToken = "testJobIdToken";
-    String extractDestinationDir = "mock://tempLocation";
     TableReference destinationTable = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
+    List<TableRow> expected = ImmutableList.of(
+        new TableRow().set("name", "a").set("number", 1L),
+        new TableRow().set("name", "b").set("number", 2L),
+        new TableRow().set("name", "c").set("number", 3L),
+        new TableRow().set("name", "d").set("number", 4L),
+        new TableRow().set("name", "e").set("number", 5L),
+        new TableRow().set("name", "f").set("number", 6L));
+    datasetService.createDataset(destinationTable.getProjectId(), destinationTable.getDatasetId(),
+        "", "");
+    Table table = new Table()
+        .setTableReference(destinationTable)
+        .setSchema(new TableSchema()
+                .setFields(
+                    ImmutableList.of(
+                        new TableFieldSchema().setName("name").setType("STRING"),
+                        new TableFieldSchema().setName("number").setType("INTEGER"))));
+    datasetService.createTable(table);
+
+    String query = FakeBigQueryServices.encodeQuery(expected);
+    jobService.expectDryRunQuery("project", query,
+        new JobStatistics().setQuery(
+            new JobStatistics2()
+                .setTotalBytesProcessed(100L)
+                .setReferencedTables(ImmutableList.of(table.getTableReference()))));
+
+    Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryNoTableQuerySourceInitSplit");
+    String jobIdToken = "testJobIdToken";
     BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
-        StaticValueProvider.of(jobIdToken), StaticValueProvider.of("query"),
+        StaticValueProvider.of(jobIdToken),
+        StaticValueProvider.of(query),
         StaticValueProvider.of(destinationTable),
-        true /* flattenResults */, true /* useLegacySql */,
-        extractDestinationDir, fakeBqServices);
+        true /* flattenResults */, true /* useLegacySql */, baseDir.toString(), fakeBqServices);
 
-    List<TableRow> expected = ImmutableList.of(
-        new TableRow().set("name", "a").set("number", "1"),
-        new TableRow().set("name", "b").set("number", "2"),
-        new TableRow().set("name", "c").set("number", "3"));
 
-    PipelineOptions options = PipelineOptionsFactory.create();
-    options.setTempLocation(extractDestinationDir);
-
-    /*
-    when(mockJobService.dryRunQuery(anyString(), Mockito.<JobConfigurationQuery>any()))
-        .thenReturn(new JobStatistics().setQuery(
-            new JobStatistics2()
-                .setTotalBytesProcessed(100L)));
-    when(mockDatasetService.getTable(eq(destinationTable)))
-        .thenReturn(new Table().setSchema(new TableSchema()));
-    IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true);
-    when(mockIOChannelFactory.resolve(anyString(), anyString()))
-        .thenReturn("mock://tempLocation/output");
-    when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))
-        .thenReturn(extractJob);*/
 
-    Assert.assertThat(
-        SourceTestUtils.readFromSource(bqSource, options),
-        CoreMatchers.is(expected));
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setTempLocation(baseDir.toString());
+    List<TableRow> read = convertBigDecimaslToLong(
+        SourceTestUtils.readFromSource(bqSource, options));
+    assertThat(read, containsInAnyOrder(Iterables.toArray(expected, TableRow.class)));
     SourceTestUtils.assertSplitAtFractionBehavior(
         bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
 
     List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
-    assertEquals(1, sources.size());
+    assertEquals(2, sources.size());
     BoundedSource<TableRow> actual = sources.get(0);
     assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
-
-    /*
-    Mockito.verify(Service)
-        .startQueryJob(
-            Mockito.<JobReference>any(), Mockito.<JobConfigurationQuery>any());
-    Mockito.verify(mockJobService)
-        .startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any());
-    Mockito.verify(mockDatasetService)
-        .createDataset(anyString(), anyString(), anyString(), anyString());
-    ArgumentCaptor<JobConfigurationQuery> queryConfigArg =
-        ArgumentCaptor.forClass(JobConfigurationQuery.class);
-    Mockito.verify(mockJobService).dryRunQuery(anyString(), queryConfigArg.capture());
-    assertEquals(true, queryConfigArg.getValue().getFlattenResults());
-    assertEquals(true, queryConfigArg.getValue().getUseLegacySql());*/
   }
 
   @Test
@@ -1604,12 +1580,27 @@ public class BigQueryIOTest implements Serializable {
       throws Exception {
     p.enableAbandonedNodeEnforcement(false);
 
+    // In the case where a static destination is specified (i.e. not through a dynamic table
+    // function) and there is no input data, WritePartition will generate an empty table. This
+    // code is to test that path.
+    TableReference singletonReference = new TableReference()
+        .setProjectId("projectid")
+        .setDatasetId("dataset")
+        .setTableId("table");
+    String singletonDescription = "singleton";
+    boolean isSingleton = numTables == 1 && numFilesPerTable == 0;
+
     List<ShardedKey<TableDestination>> expectedPartitions = Lists.newArrayList();
-    for (int i = 0; i < numTables; ++i) {
-      for (int j = 1; j <= expectedNumPartitionsPerTable; ++j) {
-        String tableName = String.format("project-id:dataset-id.tables%05d", i);
-        TableDestination destination = new TableDestination(tableName, tableName);
-        expectedPartitions.add(ShardedKey.of(destination, j));
+    if (isSingleton) {
+      expectedPartitions.add(ShardedKey.of(
+          new TableDestination(singletonReference, singletonDescription), 1));
+    } else {
+      for (int i = 0; i < numTables; ++i) {
+        for (int j = 1; j <= expectedNumPartitionsPerTable; ++j) {
+          String tableName = String.format("project-id:dataset-id.tables%05d", i);
+          TableDestination destination = new TableDestination(tableName, tableName);
+          expectedPartitions.add(ShardedKey.of(destination, j));
+        }
       }
     }
 
@@ -1642,11 +1633,7 @@ public class BigQueryIOTest implements Serializable {
         WriteBundlesToFiles.ResultCoder.of());
 
     ValueProvider<String> singletonTable = null;
-    if (numFilesPerTable == 0 && numTables == 1) {
-      TableReference singletonReference = new TableReference()
-          .setProjectId("projectid")
-          .setDatasetId("dataset")
-          .setTableId("table");
+    if (isSingleton) {
       singletonTable = StaticValueProvider.of(BigQueryHelpers.toJsonString(singletonReference));
     }
     WritePartition writePartition =
@@ -1680,12 +1667,10 @@ public class BigQueryIOTest implements Serializable {
       tableFilesResult.addAll(partition.getValue());
     }
 
-    assertEquals(expectedPartitions.size(), partitionsResult.size());
+    assertThat(partitionsResult,
+        containsInAnyOrder(Iterables.toArray(expectedPartitions, ShardedKey.class)));
 
-   // assertThat(partitionsResult,
-     //   containsInAnyOrder(Iterables.toArray(expectedPartitions, ShardedKey.class)));
-
-    if (numFilesPerTable == 0 && numTables == 1) {
+    if (isSingleton) {
       assertEquals(1, filesPerTableResult.size());
       List<String> singletonFiles = filesPerTableResult.values().iterator().next();
       assertTrue(Files.exists(Paths.get(singletonFiles.get(0))));
@@ -1700,15 +1685,11 @@ public class BigQueryIOTest implements Serializable {
   public void testWriteTables() throws Exception {
     p.enableAbandonedNodeEnforcement(false);
 
+    FakeDatasetService datasetService = new FakeDatasetService();
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService())
-        //    .startJobReturns("done", "done", "done", "done", "done", "done", "done", "done",
-       //         "done", "done")
-       //     .pollJobReturns(Status.FAILED, Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED,
-       //         Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED,
-      //          Status.SUCCEEDED, Status.SUCCEEDED))
-        .withDatasetService(mockDatasetService);
-
+        .withDatasetService(datasetService);
+    datasetService.createDataset("project-id", "dataset-id", "", "");
     long numTables = 3;
     long numPartitions = 3;
     long numFilesPerPartition = 10;
@@ -1716,6 +1697,8 @@ public class BigQueryIOTest implements Serializable {
     String tempFilePrefix = "tempFilePrefix";
     Map<TableDestination, List<String>> expectedTempTables = Maps.newHashMap();
 
+    Path baseDir = Files.createTempDirectory(tempFolder, "testWriteTables");
+
     List<KV<ShardedKey<TableDestination>, Iterable<List<String>>>> partitions =
         Lists.newArrayList();
     for (int i = 0; i < numTables; ++i) {
@@ -1726,7 +1709,16 @@ public class BigQueryIOTest implements Serializable {
             jobIdToken + "_0x%08x_%05d", tableDestination.hashCode(), j);
         List<String> filesPerPartition = Lists.newArrayList();
         for (int k = 0; k < numFilesPerPartition; ++k) {
-          filesPerPartition.add(String.format("files0x%08x_%05d", tableDestination.hashCode(), k));
+          String filename = Paths.get(baseDir.toString(),
+              String.format("files0x%08x_%05d", tempTableId.hashCode(), k)).toString();
+          try (WritableByteChannel channel = IOChannelUtils.create(filename, MimeTypes.TEXT)) {
+            try (OutputStream output = Channels.newOutputStream(channel)) {
+              TableRow tableRow = new TableRow().set("name", tableName);
+              TableRowJsonCoder.of().encode(tableRow, output, Context.OUTER);
+              output.write("\n".getBytes(StandardCharsets.UTF_8));
+            }
+          }
+          filesPerPartition.add(filename);
         }
         partitions.add(KV.of(ShardedKey.of(tableDestination, j),
             (Iterable<List<String>>) Collections.singleton(filesPerPartition)));
@@ -1814,25 +1806,45 @@ public class BigQueryIOTest implements Serializable {
   public void testWriteRename() throws Exception {
     p.enableAbandonedNodeEnforcement(false);
 
+    FakeDatasetService datasetService = new FakeDatasetService();
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService())
-         //   .startJobReturns("done", "done")
-        //    .pollJobReturns(Status.FAILED, Status.SUCCEEDED))
-        .withDatasetService(mockDatasetService);
+        .withDatasetService(datasetService);
+    datasetService.createDataset("project-id", "dataset-id", "", "");
 
-    int numFinalTables = 3;
-    int numTempTables = 3;
+    final int numFinalTables = 3;
+    final int numTempTablesPerFinalTable = 3;
+    final int numRecordsPerTempTable = 10;
+
+    Map<TableDestination, List<TableRow>> expectedRowsPerTable = Maps.newHashMap();
     String jobIdToken = "jobIdToken";
-    String jsonTable = "{}";
     Map<TableDestination, Iterable<String>> tempTables = Maps.newHashMap();
     for (int i = 0; i < numFinalTables; ++i) {
       String tableName = "project-id:dataset-id.table_" + i;
-      TableDestination tableDestination = new TableDestination(tableName, tableName);
+      TableDestination tableDestination = new TableDestination(
+          tableName, "table_" + i + "_desc");
       List<String> tables = Lists.newArrayList();
       tempTables.put(tableDestination, tables);
-      for (int j = 0; i < numTempTables; ++i) {
-        tables.add(String.format(
-            "{\"project-id:dataset-id.tableId\":\"%s_%05d_%05d\"}", jobIdToken, i, j));
+
+      List<TableRow> expectedRows = expectedRowsPerTable.get(tableDestination);
+      if (expectedRows == null) {
+        expectedRows = Lists.newArrayList();
+        expectedRowsPerTable.put(tableDestination, expectedRows);
+      }
+      for (int j = 0; i < numTempTablesPerFinalTable; ++i) {
+        TableReference tempTable = new TableReference()
+            .setProjectId("project-id")
+            .setDatasetId("dataset-id")
+            .setTableId(String.format("%s_%05d_%05d", jobIdToken, i, j));
+        datasetService.createTable(new Table().setTableReference(tempTable));
+
+        List<TableRow> rows = Lists.newArrayList();
+        for (int k = 0; k < numRecordsPerTempTable; ++k) {
+          rows.add(new TableRow().set("number", j * numTempTablesPerFinalTable + k));
+        }
+        datasetService.insertAll(tempTable, rows, null);
+        expectedRows.addAll(rows);
+        tables.add(BigQueryHelpers.toJsonString(tempTable));
       }
     }
 
@@ -1857,37 +1869,52 @@ public class BigQueryIOTest implements Serializable {
     tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables);
     tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken);
     tester.processElement(null);
+
+    for (Map.Entry<TableDestination, Iterable<String>> entry : tempTables.entrySet()) {
+      TableDestination tableDestination = entry.getKey();
+      TableReference tableReference = tableDestination.getTableReference();
+      Table table = checkNotNull(datasetService.getTable(tableReference));
+      assertEquals(tableReference.getTableId() + "_desc", tableDestination.getTableDescription());
+
+      List<TableRow> expectedRows = expectedRowsPerTable.get(tableDestination);
+      assertThat(datasetService.getAllRows(tableReference.getProjectId(),
+          tableReference.getDatasetId(), tableReference.getTableId()),
+          containsInAnyOrder(Iterables.toArray(expectedRows, TableRow.class)));
+
+      // Temp tables should be deleted.
+      for (String tempTableJson : entry.getValue()) {
+        TableReference tempTable = BigQueryHelpers.fromJsonString(
+            tempTableJson, TableReference.class);
+        assertEquals(null, datasetService.getTable(tempTable));
+      }
+    }
   }
 
   @Test
   public void testRemoveTemporaryTables() throws Exception {
-    String projectId = "someproject";
-    String datasetId = "somedataset";
-    List<String> tables = Lists.newArrayList("table1", "table2", "table3");
+    FakeDatasetService datasetService = new FakeDatasetService();
+    String projectId = "project";
+    String datasetId = "dataset";
+    datasetService.createDataset(projectId, datasetId, "", "");
     List<TableReference> tableRefs = Lists.newArrayList(
-        BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId,
-            tables.get(0))),
-        BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId,
-            tables.get(1))),
-        BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId,
-            tables.get(2))));
+        BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, "table1")),
+        BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, "table2")),
+        BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, "table3")));
+    for (TableReference tableRef : tableRefs) {
+      datasetService.createTable(new Table().setTableReference(tableRef));
+    }
 
-    doThrow(new IOException("Unable to delete table"))
-        .when(mockDatasetService).deleteTable(tableRefs.get(0));
-    doNothing().when(mockDatasetService).deleteTable(tableRefs.get(1));
-    doNothing().when(mockDatasetService).deleteTable(tableRefs.get(2));
+    // Add one more table to delete that does not actually exist.
+    tableRefs.add(
+        BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, "table4")));
 
-    WriteRename.removeTemporaryTables(mockDatasetService, tableRefs);
+    WriteRename.removeTemporaryTables(datasetService, tableRefs);
 
     for (TableReference ref : tableRefs) {
       loggedWriteRename.verifyDebug("Deleting table " + toJsonString(ref));
+      checkState(datasetService.getTable(ref) == null,
+          "Table " + ref + " was not deleted!");
     }
-    loggedWriteRename.verifyWarn("Failed to delete the table "
-        + toJsonString(tableRefs.get(0)));
-    loggedWriteRename.verifyNotLogged("Failed to delete the table "
-        + toJsonString(tableRefs.get(1)));
-    loggedWriteRename.verifyNotLogged("Failed to delete the table "
-        + toJsonString(tableRefs.get(2)));
   }
 
   /** Test options. **/
@@ -1957,43 +1984,6 @@ public class BigQueryIOTest implements Serializable {
       }}).length);
   }
 
-  private class WriteExtractFiles implements SerializableFunction<GenericJson, Void> {
-    private final SerializableFunction<Void, Schema> schemaGenerator;
-    private final Collection<Map<String, Object>> records;
-
-    private WriteExtractFiles(
-        SerializableFunction<Void, Schema> schemaGenerator,
-        Collection<Map<String, Object>> records) {
-      this.schemaGenerator = schemaGenerator;
-      this.records = records;
-    }
-
-    @Override
-    public Void apply(GenericJson input) {
-      List<String> destinations = (List<String>) input.get("destinationUris");
-      for (String destination : destinations) {
-        String newDest = destination.replace("*", "000000000000");
-        Schema schema = schemaGenerator.apply(null);
-        try (WritableByteChannel channel = IOChannelUtils.create(newDest, MimeTypes.BINARY);
-            DataFileWriter<GenericRecord> tableRowWriter =
-                new DataFileWriter<>(new GenericDatumWriter<GenericRecord>(schema))
-                    .create(schema, Channels.newOutputStream(channel))) {
-          for (Map<String, Object> record : records) {
-            GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(schema);
-            for (Map.Entry<String, Object> field : record.entrySet()) {
-              genericRecordBuilder.set(field.getKey(), field.getValue());
-            }
-            tableRowWriter.append(genericRecordBuilder.build());
-          }
-        } catch (IOException e) {
-          throw new IllegalStateException(
-              String.format("Could not create destination for extract job %s", destination), e);
-        }
-      }
-      return null;
-    }
-  }
-
   @Test
   public void testShardedKeyCoderIsSerializableWithWellKnownCoderType() {
     CoderProperties.coderSerializable(ShardedKeyCoder.of(GlobalWindow.Coder.INSTANCE));
@@ -2013,4 +2003,19 @@ public class BigQueryIOTest implements Serializable {
                 TableRowInfoCoder.of()),
             IntervalWindow.getCoder()));
   }
+
+  List<TableRow> convertBigDecimaslToLong(List<TableRow> toConvert) {
+    // The numbers come back as BigDecimal objects after JSON serialization. Change them back to
+    // longs so that we can assert the output.
+    List<TableRow> converted = Lists.newArrayList();
+    for (TableRow entry : toConvert) {
+      TableRow convertedEntry = entry.clone();
+      Object num = convertedEntry.get("number");
+      if (num instanceof BigDecimal) {
+        convertedEntry.set("number", ((BigDecimal) num).longValue());
+      }
+      converted.add(convertedEntry);
+    }
+    return converted;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
index ed3ab37..6dfd9d7 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
@@ -1,39 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString;
 import static org.junit.Assert.assertEquals;
 
+import com.google.api.client.util.Base64;
 import com.google.api.services.bigquery.model.JobConfigurationQuery;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.collect.Lists;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.List;
 import java.util.NoSuchElementException;
+
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.TableRowJsonCoder;
 import org.apache.beam.sdk.options.BigQueryOptions;
 
 
 /**
- * Created by relax on 3/30/17.
+ * A fake implementation of BigQuery's query service..
  */
 class FakeBigQueryServices implements BigQueryServices {
-  private String[] jsonTableRowReturns = new String[0];
   private JobService jobService;
-  private DatasetService datasetService;
+  private FakeDatasetService datasetService;
 
-  public FakeBigQueryServices withJobService(JobService jobService) {
+  FakeBigQueryServices withJobService(JobService jobService) {
     this.jobService = jobService;
     return this;
   }
 
-  public FakeBigQueryServices withDatasetService(DatasetService datasetService) {
+  FakeBigQueryServices withDatasetService(FakeDatasetService datasetService) {
     this.datasetService = datasetService;
     return this;
   }
 
-  public FakeBigQueryServices readerReturns(String... jsonTableRowReturns) {
-    this.jsonTableRowReturns = jsonTableRowReturns;
-    return this;
-  }
-
   @Override
   public JobService getJobService(BigQueryOptions bqOptions) {
     return jobService;
@@ -45,26 +65,58 @@ class FakeBigQueryServices implements BigQueryServices {
   }
 
   @Override
-  public BigQueryJsonReader getReaderFromTable(
-      BigQueryOptions bqOptions, TableReference tableRef) {
-    return new FakeBigQueryReader(jsonTableRowReturns);
+  public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableReference tableRef) {
+    try {
+      List<TableRow> rows = datasetService.getAllRows(
+          tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId());
+      return new FakeBigQueryReader(rows);
+    } catch (Exception e) {
+      return null;
+    }
   }
 
   @Override
   public BigQueryJsonReader getReaderFromQuery(
       BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) {
-    return new FakeBigQueryReader(jsonTableRowReturns);
+    try {
+      List<TableRow> rows = rowsFromEncodedQuery(queryConfig.getQuery());
+      return new FakeBigQueryReader(rows);
+    } catch (IOException e) {
+      return null;
+    }
+  }
+
+  static List<TableRow> rowsFromEncodedQuery(String query) throws IOException {
+    ListCoder<TableRow> listCoder = ListCoder.of(TableRowJsonCoder.of());
+    ByteArrayInputStream input = new ByteArrayInputStream(Base64.decodeBase64(query));
+    List<TableRow> rows = listCoder.decode(input, Context.OUTER);
+    for (TableRow row : rows) {
+      convertNumbers(row);
+    }
+    return rows;
+  }
+
+  static String encodeQuery(List<TableRow> rows) throws IOException {
+    ListCoder<TableRow> listCoder = ListCoder.of(TableRowJsonCoder.of());
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    listCoder.encode(rows, output, Context.OUTER);
+    return Base64.encodeBase64String(output.toByteArray());
   }
 
   private static class FakeBigQueryReader implements BigQueryJsonReader {
     private static final int UNSTARTED = -1;
     private static final int CLOSED = Integer.MAX_VALUE;
 
-    private String[] jsonTableRowReturns;
+    private List<byte[]> serializedTableRowReturns;
     private int currIndex;
 
-    FakeBigQueryReader(String[] jsonTableRowReturns) {
-      this.jsonTableRowReturns = jsonTableRowReturns;
+    FakeBigQueryReader(List<TableRow> tableRowReturns) throws IOException {
+      this.serializedTableRowReturns = Lists.newArrayListWithExpectedSize(tableRowReturns.size());
+      for (TableRow tableRow : tableRowReturns) {
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        TableRowJsonCoder.of().encode(tableRow, output, Context.OUTER);
+        serializedTableRowReturns.add(output.toByteArray());
+      }
       this.currIndex = UNSTARTED;
     }
 
@@ -72,20 +124,27 @@ class FakeBigQueryServices implements BigQueryServices {
     public boolean start() throws IOException {
       assertEquals(UNSTARTED, currIndex);
       currIndex = 0;
-      return currIndex < jsonTableRowReturns.length;
+      return currIndex < serializedTableRowReturns.size();
     }
 
     @Override
     public boolean advance() throws IOException {
-      return ++currIndex < jsonTableRowReturns.length;
+      return ++currIndex < serializedTableRowReturns.size();
     }
 
     @Override
     public TableRow getCurrent() throws NoSuchElementException {
-      if (currIndex >= jsonTableRowReturns.length) {
+      if (currIndex >= serializedTableRowReturns.size()) {
         throw new NoSuchElementException();
       }
-      return fromJsonString(jsonTableRowReturns[currIndex], TableRow.class);
+
+      ByteArrayInputStream input = new ByteArrayInputStream(
+          serializedTableRowReturns.get(currIndex));
+      try {
+        return convertNumbers(TableRowJsonCoder.of().decode(input, Context.OUTER));
+      } catch (IOException e) {
+        return null;
+      }
     }
 
     @Override
@@ -93,4 +152,15 @@ class FakeBigQueryServices implements BigQueryServices {
       currIndex = CLOSED;
     }
   }
+
+
+  // Longs tend to get converted back to Integers due to JSON serialization. Convert them back.
+  static TableRow convertNumbers(TableRow tableRow) {
+    for (TableRow.Entry entry : tableRow.entrySet()) {
+      if (entry.getValue() instanceof Integer) {
+        entry.setValue(new Long((Integer) entry.getValue()));
+      }
+    }
+    return tableRow;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
index 9b2cf63..5103adb 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
@@ -1,9 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static org.junit.Assert.assertEquals;
 
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.HttpHeaders;
 import com.google.api.services.bigquery.model.Dataset;
+import com.google.api.services.bigquery.model.DatasetReference;
 import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
@@ -24,13 +44,13 @@ class FakeDatasetService implements DatasetService, Serializable {
       throws InterruptedException, IOException {
     synchronized (BigQueryIOTest.tables) {
       Map<String, TableContainer> dataset =
-          checkNotNull(
-              BigQueryIOTest.tables.get(tableRef.getProjectId(), tableRef.getDatasetId()),
-              "Tried to get a dataset %s:%s from %s, but no such dataset was set",
-              tableRef.getProjectId(),
-              tableRef.getDatasetId(),
-              tableRef.getTableId(),
-              FakeDatasetService.class.getSimpleName());
+              BigQueryIOTest.tables.get(tableRef.getProjectId(), tableRef.getDatasetId());
+      if (dataset == null) {
+        throwNotFound(
+            "Tried to get a dataset %s:%s from, but no such dataset was set",
+            tableRef.getProjectId(),
+            tableRef.getDatasetId());
+      }
       TableContainer tableContainer = dataset.get(tableRef.getTableId());
       return tableContainer == null ? null : tableContainer.getTable();
     }
@@ -44,27 +64,40 @@ class FakeDatasetService implements DatasetService, Serializable {
   }
 
   private TableContainer getTableContainer(String projectId, String datasetId, String tableId)
-          throws InterruptedException, IOException {
-     synchronized (BigQueryIOTest.tables) {
-       Map<String, TableContainer> dataset =
-           checkNotNull(
-               BigQueryIOTest.tables.get(projectId, datasetId),
-               "Tried to get a dataset %s:%s from %s, but no such dataset was set",
-               projectId,
-               datasetId,
-               FakeDatasetService.class.getSimpleName());
-       return checkNotNull(dataset.get(tableId),
-           "Tried to get a table %s:%s.%s from %s, but no such table was set",
-           projectId,
-           datasetId,
-           tableId,
-           FakeDatasetService.class.getSimpleName());
-     }
+      throws InterruptedException, IOException {
+    synchronized (BigQueryIOTest.tables) {
+      Map<String, TableContainer> dataset = BigQueryIOTest.tables.get(projectId, datasetId);
+      if (dataset == null) {
+        throwNotFound(
+            "Tried to get a dataset %s:%s, but no such dataset was set",
+            projectId,
+            datasetId);
+      }
+      TableContainer tableContainer = dataset.get(tableId);
+      if (tableContainer == null) {
+        throwNotFound(
+            "Tried to get a table %s:%s.%s, but no such table was set",
+            projectId,
+            datasetId,
+            tableId);
+      }
+      return tableContainer;
+    }
   }
 
   @Override
   public void deleteTable(TableReference tableRef) throws IOException, InterruptedException {
-    throw new UnsupportedOperationException("Unsupported");
+    synchronized (BigQueryIOTest.tables) {
+      Map<String, TableContainer> dataset =
+          BigQueryIOTest.tables.get(tableRef.getProjectId(), tableRef.getDatasetId());
+      if (dataset == null) {
+        throwNotFound(
+            "Tried to get a dataset %s:%s, but no such table was set",
+            tableRef.getProjectId(),
+            tableRef.getDatasetId());
+      }
+      dataset.remove(tableRef.getTableId());
+    }
   }
 
 
@@ -73,13 +106,13 @@ class FakeDatasetService implements DatasetService, Serializable {
     TableReference tableReference = table.getTableReference();
     synchronized (BigQueryIOTest.tables) {
       Map<String, TableContainer> dataset =
-          checkNotNull(
-              BigQueryIOTest.tables.get(tableReference.getProjectId(),
-                  tableReference.getDatasetId()),
-              "Tried to get a dataset %s:%s from %s, but no such table was set",
-              tableReference.getProjectId(),
-              tableReference.getDatasetId(),
-              FakeDatasetService.class.getSimpleName());
+          BigQueryIOTest.tables.get(tableReference.getProjectId(), tableReference.getDatasetId());
+      if (dataset == null) {
+        throwNotFound(
+            "Tried to get a dataset %s:%s, but no such table was set",
+            tableReference.getProjectId(),
+            tableReference.getDatasetId());
+      }
       TableContainer tableContainer = dataset.get(tableReference.getTableId());
       if (tableContainer == null) {
         tableContainer = new TableContainer(table);
@@ -98,7 +131,16 @@ class FakeDatasetService implements DatasetService, Serializable {
   @Override
   public Dataset getDataset(
       String projectId, String datasetId) throws IOException, InterruptedException {
-    throw new UnsupportedOperationException("Unsupported");
+    synchronized (BigQueryIOTest.tables) {
+      Map<String, TableContainer> dataset = BigQueryIOTest.tables.get(projectId, datasetId);
+      if (dataset == null) {
+        throwNotFound("Tried to get a dataset %s:%s, but no such table was set",
+                    projectId, datasetId);
+      }
+      return new Dataset().setDatasetReference(new DatasetReference()
+          .setDatasetId(datasetId)
+          .setProjectId(projectId));
+    }
   }
 
   @Override
@@ -117,7 +159,9 @@ class FakeDatasetService implements DatasetService, Serializable {
   @Override
   public void deleteDataset(String projectId, String datasetId)
       throws IOException, InterruptedException {
-    throw new UnsupportedOperationException("Unsupported");
+    synchronized (BigQueryIOTest.tables) {
+      BigQueryIOTest.tables.remove(projectId, datasetId);
+    }
   }
 
   @Override
@@ -138,8 +182,7 @@ class FakeDatasetService implements DatasetService, Serializable {
       TableContainer tableContainer = getTableContainer(
           ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
       for (int i = 0; i < rowList.size(); ++i) {
-        tableContainer.addRow(rowList.get(i), insertIdList.get(i));
-        dataSize += rowList.get(i).toString().length();
+        dataSize += tableContainer.addRow(rowList.get(i), insertIdList.get(i));
       }
       return dataSize;
     }
@@ -150,23 +193,16 @@ class FakeDatasetService implements DatasetService, Serializable {
                                      @Nullable String tableDescription)
       throws IOException, InterruptedException {
     synchronized (BigQueryIOTest.tables) {
-      Map<String, TableContainer> dataset =
-          checkNotNull(
-              BigQueryIOTest.tables.get(tableReference.getProjectId(),
-                  tableReference.getDatasetId()),
-              "Tried to get a dataset %s:%s from %s, but no such dataset was set",
-              tableReference.getProjectId(),
-              tableReference.getDatasetId(),
-              tableReference.getTableId(),
-              FakeDatasetService.class.getSimpleName());
-      TableContainer tableContainer = checkNotNull(dataset.get(tableReference.getTableId()),
-          "Tried to patch a table %s:%s.%s from %s, but no such table was set",
-          tableReference.getProjectId(),
-          tableReference.getDatasetId(),
-          tableReference.getTableId(),
-          FakeDatasetService.class.getSimpleName());
+      TableContainer tableContainer = getTableContainer(tableReference.getProjectId(),
+          tableReference.getDatasetId(), tableReference.getTableId());
       tableContainer.getTable().setDescription(tableDescription);
       return tableContainer.getTable();
     }
   }
+
+  void throwNotFound(String format, Object... args) throws IOException {
+    throw new IOException(
+        new GoogleJsonResponseException.Builder(404,
+            String.format(format, args), new HttpHeaders()).build());
+  }
 }