You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:14:48 UTC
[14/50] [abbrv] beam git commit: Fix tests to properly fake out
BigQueryService, and add tests for dynamic-table functionality.
Fix tests to properly fake out BigQueryService, and add tests for dynamic-table functionality.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b486137d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b486137d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b486137d
Branch: refs/heads/DSL_SQL
Commit: b486137d2190db9212a92176f703e6ed7858fe59
Parents: 760a945
Author: Reuven Lax <re...@google.com>
Authored: Fri Mar 31 14:16:48 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 21:12:50 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BatchLoads.java | 7 +-
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 15 +-
.../beam/sdk/io/gcp/bigquery/ShardedKey.java | 2 +-
.../sdk/io/gcp/bigquery/StreamingInserts.java | 5 +-
.../sdk/io/gcp/bigquery/StreamingWriteFn.java | 1 -
.../sdk/io/gcp/bigquery/TableDestination.java | 3 +-
.../sdk/io/gcp/bigquery/TableRowWriter.java | 3 +-
.../sdk/io/gcp/bigquery/TagWithUniqueIds.java | 9 -
.../io/gcp/bigquery/WriteBundlesToFiles.java | 12 +-
.../sdk/io/gcp/bigquery/WritePartition.java | 13 +-
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 613 ++++++++++---------
.../io/gcp/bigquery/FakeBigQueryServices.java | 114 +++-
.../sdk/io/gcp/bigquery/FakeDatasetService.java | 138 +++--
.../sdk/io/gcp/bigquery/FakeJobService.java | 182 +++++-
.../sdk/io/gcp/bigquery/TableContainer.java | 33 +-
15 files changed, 703 insertions(+), 447 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 5e80fae..06fdfce 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -58,9 +58,8 @@ import org.apache.beam.sdk.values.TupleTagList;
/**
* PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery.
*/
-class BatchLoads<T> extends
- PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> {
- BigQueryIO.Write<T> write;
+class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> {
+ BigQueryIO.Write<?> write;
private static class ConstantSchemaFunction implements
SerializableFunction<TableDestination, TableSchema> {
@@ -79,7 +78,7 @@ class BatchLoads<T> extends
}
}
- BatchLoads(BigQueryIO.Write<T> write) {
+ BatchLoads(BigQueryIO.Write<?> write) {
this.write = write;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index f1baaf7..54a25c7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -64,7 +64,6 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.Transport;
@@ -536,7 +535,7 @@ public class BigQueryIO {
}
}
if (extractFiles != null && !extractFiles.isEmpty()) {
- new GcsUtilFactory().create(options).remove(extractFiles);
+ IOChannelUtils.getFactory(extractFiles.iterator().next()).remove(extractFiles);
}
}
};
@@ -701,8 +700,8 @@ public class BigQueryIO {
@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setJsonTableRef(ValueProvider<String> jsonTableRef);
- abstract Builder<T> setTableRefFunction(
- SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction);
+ abstract Builder<T> setTableFunction(
+ SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction);
abstract Builder<T> setFormatFunction(
SerializableFunction<T, TableRow> formatFunction);
abstract Builder<T> setJsonSchema(ValueProvider<String> jsonSchema);
@@ -823,8 +822,7 @@ public class BigQueryIO {
* {@link ValueInSingleWindow}, so can be determined by the value or by the window.
*/
public Write<T> to(
- SerializableFunction<ValueInSingleWindow<T>, String> tableSpecFunction) {
- return toTableReference(new TranslateTableSpecFunction<T>(tableSpecFunction));
+ SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction) {
ensureToNotCalledYet();
return toBuilder().setTableFunction(tableFunction).build();
}
@@ -834,7 +832,7 @@ public class BigQueryIO {
* {@link TableReference} instead of a string table specification.
*/
private Write<T> toTableReference(
- SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction) {
+ SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction) {
ensureToNotCalledYet();
return toBuilder().setTableFunction(tableFunction).build();
}
@@ -984,8 +982,7 @@ public class BigQueryIO {
if (input.isBounded() == IsBounded.UNBOUNDED) {
return rowsWithDestination.apply(new StreamingInserts(this));
} else {
-
- return rowsWithDestination.apply(new BatchLoads<T>(this));
+ return rowsWithDestination.apply(new BatchLoads(this));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
index ab57446..09b4fbf 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
@@ -56,7 +56,7 @@ class ShardedKey<K> implements Serializable {
return false;
}
ShardedKey<K> other = (ShardedKey<K>) o;
- return (key == other.key) && (shardNumber == other.shardNumber);
+ return Objects.equals(key, other.key) && Objects.equals(shardNumber, other.shardNumber);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
index 37afbdf..ced1d66 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
@@ -38,9 +38,8 @@ import org.apache.beam.sdk.values.PCollection;
* PTransform that performs streaming BigQuery write. To increase consistency,
* it leverages BigQuery best effort de-dup mechanism.
*/
-
-class StreamingInserts
- extends PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> {
+class StreamingInserts extends PTransform<PCollection<KV<TableDestination, TableRow>>,
+ WriteResult> {
private final Write<?> write;
private static class ConstantSchemaFunction implements
http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
index 83ed3d2..22b2078 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
@@ -98,7 +98,6 @@ class StreamingWriteFn
private void flushRows(TableReference tableReference,
List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions options)
throws InterruptedException {
- System.out.println("FlUSHING ROWS " + tableRows.size());
if (!tableRows.isEmpty()) {
try {
long totalBytes = bqServices.getDatasetService(options).insertAll(
http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
index e8538e0..36e1401 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
@@ -64,7 +64,8 @@ public class TableDestination implements Serializable {
return false;
}
TableDestination other = (TableDestination) o;
- return (tableSpec == other.tableSpec) && (tableDescription == other.tableDescription);
+ return Objects.equals(this.tableSpec, other.tableSpec)
+ && Objects.equals(this.tableDescription, other.tableDescription);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
index a1f6153..ee8f466 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
@@ -29,7 +29,6 @@ import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.TableRowJsonCoder;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
-import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +56,7 @@ class TableRowWriter {
}
}
TableRowWriter(String basename) {
- this.tempFilePrefix = basename;
+ this.tempFilePrefix = basename;
}
public final void open(String uId) throws Exception {
http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
index 6f0186e..7379784 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
@@ -18,23 +18,14 @@
package org.apache.beam.sdk.io.gcp.bigquery;
-import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
import java.io.IOException;
import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToTableSpec;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
-import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.ValueInSingleWindow;
/**
* Fn that tags each table row with a unique id and destination table.
http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index b8069f6..869e68a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -19,19 +19,16 @@
package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.api.services.bigquery.model.TableRow;
-
+import com.google.common.collect.Maps;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Map;
import java.util.UUID;
-
-import com.google.common.collect.Maps;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.TableRowJsonCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -50,6 +47,10 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund
private transient Map<TableDestination, TableRowWriter> writers;
private final String tempFilePrefix;
+ /**
+ * The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output file,
+ * and encapsulates the table it is destined to as well as the file byte size.
+ */
public static class Result implements Serializable {
public String filename;
public Long fileByteSize;
@@ -62,6 +63,9 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund
}
}
+ /**
+ * a coder for the {@link Result} class.
+ */
public static class ResultCoder extends AtomicCoder<Result> {
private static final ResultCoder INSTANCE = new ResultCoder();
http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
index c48955b..9c48b82 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
@@ -89,8 +89,8 @@ class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>, List<
partitions.add(Lists.<String>newArrayList());
currResultsMap.put(tableDestination, partitions);
}
- int currNumFiles = currNumFilesMap.getOrDefault(tableDestination, 0);
- long currSizeBytes = currSizeBytesMap.getOrDefault(tableDestination, 0L);
+ int currNumFiles = getOrDefault(currNumFilesMap, tableDestination, 0);
+ long currSizeBytes = getOrDefault(currSizeBytesMap, tableDestination, 0L);
if (currNumFiles + 1 > Write.MAX_NUM_FILES
|| currSizeBytes + fileResult.fileByteSize > Write.MAX_SIZE_BYTES) {
// Add a new partition for this table.
@@ -117,4 +117,13 @@ class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>, List<
}
}
}
+
+ private <T> T getOrDefault(Map<TableDestination, T> map, TableDestination tableDestination,
+ T defaultValue) {
+ if (map.containsKey(tableDestination)) {
+ return map.get(tableDestination);
+ } else {
+ return defaultValue;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index d1ef8e2..f10be13 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -18,6 +18,8 @@
package org.apache.beam.sdk.io.gcp.bigquery;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -26,17 +28,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.when;
-
-import com.google.api.client.json.GenericJson;
+
import com.google.api.client.util.Data;
import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.JobStatistics2;
import com.google.api.services.bigquery.model.JobStatistics4;
@@ -48,7 +42,7 @@ import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -58,9 +52,12 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
+import java.math.BigDecimal;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
@@ -69,14 +66,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -88,7 +81,6 @@ import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperation;
import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
import org.apache.beam.sdk.options.BigQueryOptions;
@@ -122,7 +114,6 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.PCollectionViews;
@@ -140,6 +131,7 @@ import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
@@ -147,10 +139,6 @@ import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
/**
* Tests for BigQueryIO.
@@ -158,6 +146,8 @@ import org.mockito.MockitoAnnotations;
@RunWith(JUnit4.class)
public class BigQueryIOTest implements Serializable {
+ private static Path tempFolder;
+
// Table information must be static, as each ParDo will get a separate instance of
// FakeDatasetServices, and they must all modify the same storage.
static com.google.common.collect.Table<String, String, Map<String, TableContainer>>
@@ -169,8 +159,6 @@ public class BigQueryIOTest implements Serializable {
@Rule public transient ExpectedLogs loggedWriteRename = ExpectedLogs.none(WriteRename.class);
@Rule public transient ExpectedLogs loggedWriteTables = ExpectedLogs.none(WriteTables.class);
@Rule public transient TemporaryFolder testFolder = new TemporaryFolder();
- @Mock private transient IOChannelFactory mockIOChannelFactory;
- @Mock(extraInterfaces = Serializable.class) private transient DatasetService mockDatasetService;
private void checkReadTableObject(
BigQueryIO.Read read, String project, String dataset, String table) {
@@ -227,9 +215,13 @@ public class BigQueryIOTest implements Serializable {
assertEquals(validate, write.getValidate());
}
+ @BeforeClass
+ public static void setupClass() throws IOException {
+ tempFolder = Files.createTempDirectory("BigQueryIOTest");
+ }
+
@Before
public void setUp() throws IOException {
- MockitoAnnotations.initMocks(this);
tables = HashBasedTable.create();
BigQueryIO.clearCreatedTables();
}
@@ -289,29 +281,53 @@ public class BigQueryIOTest implements Serializable {
String tableId = "sometable";
BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
bqOptions.setProject(projectId);
- bqOptions.setTempLocation("gs://testbucket/testdir");
+
+ Path baseDir = Files.createTempDirectory(tempFolder, "testValidateReadSetsDefaultProject");
+ bqOptions.setTempLocation(baseDir.toString());
FakeDatasetService fakeDatasetService = new FakeDatasetService();
fakeDatasetService.createDataset(projectId, datasetId, "", "");
TableReference tableReference =
new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId);
- fakeDatasetService.createTable(new Table().setTableReference(tableReference));
+ fakeDatasetService.createTable(new Table()
+ .setTableReference(tableReference)
+ .setSchema(new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("name").setType("STRING"),
+ new TableFieldSchema().setName("number").setType("INTEGER")))));
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
.withJobService(new FakeJobService())
.withDatasetService(fakeDatasetService);
+ List<TableRow> expected = ImmutableList.of(
+ new TableRow().set("name", "a").set("number", 1L),
+ new TableRow().set("name", "b").set("number", 2L),
+ new TableRow().set("name", "c").set("number", 3L),
+ new TableRow().set("name", "d").set("number", 4L),
+ new TableRow().set("name", "e").set("number", 5L),
+ new TableRow().set("name", "f").set("number", 6L));
+ fakeDatasetService.insertAll(tableReference, expected, null);
+
Pipeline p = TestPipeline.create(bqOptions);
TableReference tableRef = new TableReference();
tableRef.setDatasetId(datasetId);
tableRef.setTableId(tableId);
- thrown.expect(RuntimeException.class);
- // Message will be one of following depending on the execution environment.
- thrown.expectMessage(Matchers.containsString("Unsupported"));
- p.apply(BigQueryIO.read().from(tableRef)
- .withTestServices(fakeBqServices));
+ PCollection<KV<String, Long>> output =
+ p.apply(BigQueryIO.read().from(tableRef).withTestServices(fakeBqServices))
+ .apply(ParDo.of(new DoFn<TableRow, KV<String, Long>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(KV.of((String) c.element().get("name"),
+ Long.valueOf((String) c.element().get("number"))));
+ }
+ }));
+ PAssert.that(output).containsInAnyOrder(ImmutableList.of(KV.of("a", 1L), KV.of("b", 2L),
+ KV.of("c", 3L), KV.of("d", 4L), KV.of("e", 5L), KV.of("f", 6L)));
+ p.run();
}
@Test
@@ -400,54 +416,32 @@ public class BigQueryIOTest implements Serializable {
FakeDatasetService fakeDatasetService = new FakeDatasetService();
fakeDatasetService.createDataset("non-executing-project", "somedataset", "", "");
fakeDatasetService.createTable(sometable);
- SerializableFunction<Void, Schema> schemaGenerator =
- new SerializableFunction<Void, Schema>() {
- @Override
- public Schema apply(Void input) {
- return BigQueryAvroUtils.toGenericAvroSchema(
- "sometable",
- ImmutableList.of(
- new TableFieldSchema().setName("name").setType("STRING"),
- new TableFieldSchema().setName("number").setType("INTEGER")));
- }
- };
- Collection<Map<String, Object>> records =
- ImmutableList.<Map<String, Object>>builder()
- .add(ImmutableMap.<String, Object>builder().put("name", "a").put("number", 1L).build())
- .add(ImmutableMap.<String, Object>builder().put("name", "b").put("number", 2L).build())
- .add(ImmutableMap.<String, Object>builder().put("name", "c").put("number", 3L).build())
- .build();
- SerializableFunction<GenericJson, Void> onStartJob =
- new WriteExtractFiles(schemaGenerator, records);
+ List<TableRow> records = Lists.newArrayList(
+ new TableRow().set("name", "a").set("number", 1L),
+ new TableRow().set("name", "b").set("number", 2L),
+ new TableRow().set("name", "c").set("number", 3L));
+ fakeDatasetService.insertAll(sometable.getTableReference(), records, null);
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
.withJobService(new FakeJobService())
- // .startJobReturns(onStartJob, "done")
- // .pollJobReturns(job)
- // .getJobReturns((Job) null)
- // .verifyExecutingProject(bqOptions.getProject()))
- .withDatasetService(fakeDatasetService)
- .readerReturns(
- toJsonString(new TableRow().set("name", "a").set("number", 1)),
- toJsonString(new TableRow().set("name", "b").set("number", 2)),
- toJsonString(new TableRow().set("name", "c").set("number", 3)));
+ .withDatasetService(fakeDatasetService);
Pipeline p = TestPipeline.create(bqOptions);
- PCollection<String> output = p
+ PCollection<KV<String, Long>> output = p
.apply(BigQueryIO.read().from("non-executing-project:somedataset.sometable")
.withTestServices(fakeBqServices)
.withoutValidation())
- .apply(ParDo.of(new DoFn<TableRow, String>() {
+ .apply(ParDo.of(new DoFn<TableRow, KV<String, Long>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
- c.output((String) c.element().get("name"));
+ c.output(KV.of((String) c.element().get("name"),
+ Long.valueOf((String) c.element().get("number"))));
}
}));
PAssert.that(output)
- .containsInAnyOrder(ImmutableList.of("a", "b", "c"));
-
+ .containsInAnyOrder(ImmutableList.of(KV.of("a", 1L), KV.of("b", 2L), KV.of("c", 3L)));
p.run();
}
@@ -457,13 +451,12 @@ public class BigQueryIOTest implements Serializable {
bqOptions.setProject("defaultproject");
bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+ FakeDatasetService datasetService = new FakeDatasetService();
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
.withJobService(new FakeJobService())
- // .startJobReturns("done", "done", "done")
- // .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED))
- .withDatasetService(mockDatasetService);
+ .withDatasetService(datasetService);
- mockDatasetService.createDataset("defaultproject", "dataset-id", "", "");
+ datasetService.createDataset("defaultproject", "dataset-id", "", "");
Pipeline p = TestPipeline.create(bqOptions);
p.apply(Create.of(
@@ -715,11 +708,11 @@ public class BigQueryIOTest implements Serializable {
bqOptions.setProject("defaultproject");
bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+ FakeDatasetService datasetService = new FakeDatasetService();
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
- .withJobService(new FakeJobService());
- // .startJobReturns("done", "done")
- // .pollJobReturns(Status.FAILED, Status.UNKNOWN));
-
+ .withJobService(new FakeJobService())
+ .withDatasetService(datasetService);
+ datasetService.createDataset("project-id", "dataset-id", "", "");
Pipeline p = TestPipeline.create(bqOptions);
p.apply(Create.of(
new TableRow().set("name", "a").set("number", 1),
@@ -732,7 +725,7 @@ public class BigQueryIOTest implements Serializable {
.withoutValidation());
thrown.expect(RuntimeException.class);
- thrown.expectMessage("UNKNOWN status of load job");
+ thrown.expectMessage("Failed to create load job");
try {
p.run();
} finally {
@@ -747,10 +740,10 @@ public class BigQueryIOTest implements Serializable {
bqOptions.setProject("defaultproject");
bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+ FakeDatasetService datasetService = new FakeDatasetService();
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
- .withJobService(new FakeJobService());
- // .startJobReturns("done", "done", "done")
- // .pollJobReturns(Status.FAILED, Status.FAILED, Status.FAILED));
+ .withJobService(new FakeJobService())
+ .withDatasetService(datasetService);
Pipeline p = TestPipeline.create(bqOptions);
p.apply(Create.of(
@@ -817,7 +810,7 @@ public class BigQueryIOTest implements Serializable {
BigQueryIO.Read read = BigQueryIO.read()
.from("project:dataset.tableId")
.withTestServices(new FakeBigQueryServices()
- .withDatasetService(mockDatasetService)
+ .withDatasetService(new FakeDatasetService())
.withJobService(new FakeJobService()))
.withoutValidation();
@@ -833,7 +826,7 @@ public class BigQueryIOTest implements Serializable {
BigQueryIO.Read read = BigQueryIO.read()
.fromQuery("foobar")
.withTestServices(new FakeBigQueryServices()
- .withDatasetService(mockDatasetService)
+ .withDatasetService(new FakeDatasetService())
.withJobService(new FakeJobService()))
.withoutValidation();
@@ -874,7 +867,7 @@ public class BigQueryIOTest implements Serializable {
.to("project:dataset.table")
.withSchema(new TableSchema().set("col1", "type1").set("col2", "type2"))
.withTestServices(new FakeBigQueryServices()
- .withDatasetService(mockDatasetService)
+ .withDatasetService(new FakeDatasetService())
.withJobService(new FakeJobService()))
.withoutValidation();
@@ -1040,9 +1033,7 @@ public class BigQueryIOTest implements Serializable {
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
.withJobService(new FakeJobService())
- .withDatasetService(mockDatasetService);
- when(mockDatasetService.getDataset(projectId, datasetId)).thenThrow(
- new RuntimeException("Unable to confirm BigQuery dataset presence"));
+ .withDatasetService(new FakeDatasetService());
Pipeline p = TestPipeline.create(options);
@@ -1206,26 +1197,31 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBigQueryTableSourceThroughJsonAPI() throws Exception {
+ FakeDatasetService datasetService = new FakeDatasetService();
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
.withJobService(new FakeJobService())
- .readerReturns(
- toJsonString(new TableRow().set("name", "a").set("number", "1")),
- toJsonString(new TableRow().set("name", "b").set("number", "2")),
- toJsonString(new TableRow().set("name", "c").set("number", "3")));
+ .withDatasetService(datasetService);
+ List<TableRow> expected = ImmutableList.of(
+ new TableRow().set("name", "a").set("number", "1"),
+ new TableRow().set("name", "b").set("number", "2"),
+ new TableRow().set("name", "c").set("number", "3"),
+ new TableRow().set("name", "d").set("number", "4"),
+ new TableRow().set("name", "e").set("number", "5"),
+ new TableRow().set("name", "f").set("number", "6"));
+
+ TableReference table = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
+ datasetService.createDataset(table.getProjectId(), table.getDatasetId(), "", "");
+ datasetService.createTable(new Table().setTableReference(table));
+ datasetService.insertAll(table, expected, null);
+
+ Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryTableSourceThroughJsonAPI");
String jobIdToken = "testJobIdToken";
- TableReference table = BigQueryHelpers.parseTableSpec("project.data_set.table_name");
- String extractDestinationDir = "mock://tempLocation";
BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table),
- extractDestinationDir, fakeBqServices,
+ baseDir.toString(), fakeBqServices,
StaticValueProvider.of("project"));
- List<TableRow> expected = ImmutableList.of(
- new TableRow().set("name", "a").set("number", "1"),
- new TableRow().set("name", "b").set("number", "2"),
- new TableRow().set("name", "c").set("number", "3"));
-
PipelineOptions options = PipelineOptionsFactory.create();
Assert.assertThat(
SourceTestUtils.readFromSource(bqSource, options),
@@ -1244,43 +1240,48 @@ public class BigQueryIOTest implements Serializable {
extractJob.setStatus(new JobStatus())
.setStatistics(jobStats);
+ FakeDatasetService fakeDatasetService = new FakeDatasetService();
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
.withJobService(new FakeJobService())
- .withDatasetService(mockDatasetService)
- .readerReturns(
- toJsonString(new TableRow().set("name", "a").set("number", "1")),
- toJsonString(new TableRow().set("name", "b").set("number", "2")),
- toJsonString(new TableRow().set("name", "c").set("number", "3")));
+ .withDatasetService(fakeDatasetService);
+
+ List<TableRow> expected = ImmutableList.of(
+ new TableRow().set("name", "a").set("number", 1L),
+ new TableRow().set("name", "b").set("number", 2L),
+ new TableRow().set("name", "c").set("number", 3L),
+ new TableRow().set("name", "d").set("number", 4L),
+ new TableRow().set("name", "e").set("number", 5L),
+ new TableRow().set("name", "f").set("number", 6L));
- String jobIdToken = "testJobIdToken";
TableReference table = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
- String extractDestinationDir = "mock://tempLocation";
+ fakeDatasetService.createDataset("project", "data_set", "", "");
+ fakeDatasetService.createTable(new Table().setTableReference(table)
+ .setSchema(new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("name").setType("STRING"),
+ new TableFieldSchema().setName("number").setType("INTEGER")))));
+ fakeDatasetService.insertAll(table, expected, null);
+
+ Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryTableSourceInitSplit");
+
+ String jobIdToken = "testJobIdToken";
+ String extractDestinationDir = baseDir.toString();
BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table),
extractDestinationDir, fakeBqServices, StaticValueProvider.of("project"));
- List<TableRow> expected = ImmutableList.of(
- new TableRow().set("name", "a").set("number", "1"),
- new TableRow().set("name", "b").set("number", "2"),
- new TableRow().set("name", "c").set("number", "3"));
PipelineOptions options = PipelineOptionsFactory.create();
- options.setTempLocation("mock://tempLocation");
-
- IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */);
- when(mockIOChannelFactory.resolve(anyString(), anyString()))
- .thenReturn("mock://tempLocation/output");
- when(mockDatasetService.getTable(any(TableReference.class)))
- .thenReturn(new Table().setSchema(new TableSchema()));
+ options.setTempLocation(baseDir.toString());
- Assert.assertThat(
- SourceTestUtils.readFromSource(bqSource, options),
- CoreMatchers.is(expected));
+ List<TableRow> read = SourceTestUtils.readFromSource(bqSource, options);
+ assertThat(read, containsInAnyOrder(Iterables.toArray(expected, TableRow.class)));
SourceTestUtils.assertSplitAtFractionBehavior(
bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
- assertEquals(1, sources.size());
+ assertEquals(2, sources.size());
BoundedSource<TableRow> actual = sources.get(0);
assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
}
@@ -1306,80 +1307,63 @@ public class BigQueryIOTest implements Serializable {
.setStatistics(extractJobStats);
FakeJobService fakeJobService = new FakeJobService();
+ FakeDatasetService fakeDatasetService = new FakeDatasetService();
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
.withJobService(fakeJobService)
- .withDatasetService(mockDatasetService)
- .readerReturns(
- toJsonString(new TableRow().set("name", "a").set("number", "1")),
- toJsonString(new TableRow().set("name", "b").set("number", "2")),
- toJsonString(new TableRow().set("name", "c").set("number", "3")));
+ .withDatasetService(fakeDatasetService);
+
+ List<TableRow> expected = ImmutableList.of(
+ new TableRow().set("name", "a").set("number", 1L),
+ new TableRow().set("name", "b").set("number", 2L),
+ new TableRow().set("name", "c").set("number", 3L),
+ new TableRow().set("name", "d").set("number", 4L),
+ new TableRow().set("name", "e").set("number", 5L),
+ new TableRow().set("name", "f").set("number", 6L));
- String jobIdToken = "testJobIdToken";
- String extractDestinationDir = "mock://tempLocation";
TableReference destinationTable = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
+ fakeDatasetService.createDataset("project", "data_set", "", "");
+ fakeDatasetService.createTable(new Table()
+ .setTableReference(destinationTable)
+ .setSchema(new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("name").setType("STRING"),
+ new TableFieldSchema().setName("number").setType("INTEGER")))));
+ Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryQuerySourceInitSplit");
+
+ String jobIdToken = "testJobIdToken";
+ String query = FakeBigQueryServices.encodeQuery(expected);
+ String extractDestinationDir = baseDir.toString();
BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
- StaticValueProvider.of(jobIdToken), StaticValueProvider.of("query"),
+ StaticValueProvider.of(jobIdToken), StaticValueProvider.of(query),
StaticValueProvider.of(destinationTable),
true /* flattenResults */, true /* useLegacySql */,
extractDestinationDir, fakeBqServices);
- List<TableRow> expected = ImmutableList.of(
- new TableRow().set("name", "a").set("number", "1"),
- new TableRow().set("name", "b").set("number", "2"),
- new TableRow().set("name", "c").set("number", "3"));
-
PipelineOptions options = PipelineOptionsFactory.create();
options.setTempLocation(extractDestinationDir);
TableReference queryTable = new TableReference()
- .setProjectId("testproject")
- .setDatasetId("testDataset")
- .setTableId("testTable");
- // when(mockJobService.dryRunQuery(anyString(), Mockito.<JobConfigurationQuery>any()))
- // .thenReturn(new JobStatistics().setQuery(
- // new JobStatistics2()
- // .setTotalBytesProcessed(100L)
- // .setReferencedTables(ImmutableList.of(queryTable))));
- fakeJobService.expectDryRunQuery("testproject", "query",
+ .setProjectId("project")
+ .setDatasetId("data_set")
+ .setTableId("table_name");
+
+ fakeJobService.expectDryRunQuery("project", query,
new JobStatistics().setQuery(
new JobStatistics2()
.setTotalBytesProcessed(100L)
.setReferencedTables(ImmutableList.of(queryTable))));
- // when(mockDatasetService.getTable(eq(queryTable)))
- // .thenReturn(new Table().setSchema(new TableSchema()));
- // when(mockDatasetService.getTable(eq(destinationTable)))
- // .thenReturn(new Table().setSchema(new TableSchema()));
- IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */);
- when(mockIOChannelFactory.resolve(anyString(), anyString()))
- .thenReturn("mock://tempLocation/output");
- //when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))
- // .thenReturn(extractJob);
-
- Assert.assertThat(
- SourceTestUtils.readFromSource(bqSource, options),
- CoreMatchers.is(expected));
+ List<TableRow> read = SourceTestUtils.readFromSource(bqSource, options);
+ assertThat(read, containsInAnyOrder(Iterables.toArray(expected, TableRow.class)));
SourceTestUtils.assertSplitAtFractionBehavior(
bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
+
List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
- assertEquals(1, sources.size());
+ assertEquals(2, sources.size());
BoundedSource<TableRow> actual = sources.get(0);
assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
-
- /*
- Mockito.verify(mockJobService)
- .startQueryJob(
- Mockito.<JobReference>any(), Mockito.<JobConfigurationQuery>any());
- Mockito.verify(mockJobService)
- .startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any());
- Mockito.verify(mockDatasetService)
- .createDataset(anyString(), anyString(), anyString(), anyString());
- ArgumentCaptor<JobConfigurationQuery> queryConfigArg =
- ArgumentCaptor.forClass(JobConfigurationQuery.class);
- Mockito.verify(mockJobService).dryRunQuery(anyString(), queryConfigArg.capture());
- assertEquals(true, queryConfigArg.getValue().getFlattenResults());
- assertEquals(true, queryConfigArg.getValue().getUseLegacySql());*/
}
@Test
@@ -1402,68 +1386,60 @@ public class BigQueryIOTest implements Serializable {
extractJob.setStatus(new JobStatus())
.setStatistics(extractJobStats);
+ FakeDatasetService datasetService = new FakeDatasetService();
+ FakeJobService jobService = new FakeJobService();
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
- .withJobService(new FakeJobService())
- .withDatasetService(mockDatasetService)
- .readerReturns(
- toJsonString(new TableRow().set("name", "a").set("number", "1")),
- toJsonString(new TableRow().set("name", "b").set("number", "2")),
- toJsonString(new TableRow().set("name", "c").set("number", "3")));
+ .withJobService(jobService)
+ .withDatasetService(datasetService);
- String jobIdToken = "testJobIdToken";
- String extractDestinationDir = "mock://tempLocation";
TableReference destinationTable = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
+ List<TableRow> expected = ImmutableList.of(
+ new TableRow().set("name", "a").set("number", 1L),
+ new TableRow().set("name", "b").set("number", 2L),
+ new TableRow().set("name", "c").set("number", 3L),
+ new TableRow().set("name", "d").set("number", 4L),
+ new TableRow().set("name", "e").set("number", 5L),
+ new TableRow().set("name", "f").set("number", 6L));
+ datasetService.createDataset(destinationTable.getProjectId(), destinationTable.getDatasetId(),
+ "", "");
+ Table table = new Table()
+ .setTableReference(destinationTable)
+ .setSchema(new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("name").setType("STRING"),
+ new TableFieldSchema().setName("number").setType("INTEGER"))));
+ datasetService.createTable(table);
+
+ String query = FakeBigQueryServices.encodeQuery(expected);
+ jobService.expectDryRunQuery("project", query,
+ new JobStatistics().setQuery(
+ new JobStatistics2()
+ .setTotalBytesProcessed(100L)
+ .setReferencedTables(ImmutableList.of(table.getTableReference()))));
+
+ Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryNoTableQuerySourceInitSplit");
+ String jobIdToken = "testJobIdToken";
BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
- StaticValueProvider.of(jobIdToken), StaticValueProvider.of("query"),
+ StaticValueProvider.of(jobIdToken),
+ StaticValueProvider.of(query),
StaticValueProvider.of(destinationTable),
- true /* flattenResults */, true /* useLegacySql */,
- extractDestinationDir, fakeBqServices);
+ true /* flattenResults */, true /* useLegacySql */, baseDir.toString(), fakeBqServices);
- List<TableRow> expected = ImmutableList.of(
- new TableRow().set("name", "a").set("number", "1"),
- new TableRow().set("name", "b").set("number", "2"),
- new TableRow().set("name", "c").set("number", "3"));
- PipelineOptions options = PipelineOptionsFactory.create();
- options.setTempLocation(extractDestinationDir);
-
- /*
- when(mockJobService.dryRunQuery(anyString(), Mockito.<JobConfigurationQuery>any()))
- .thenReturn(new JobStatistics().setQuery(
- new JobStatistics2()
- .setTotalBytesProcessed(100L)));
- when(mockDatasetService.getTable(eq(destinationTable)))
- .thenReturn(new Table().setSchema(new TableSchema()));
- IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true);
- when(mockIOChannelFactory.resolve(anyString(), anyString()))
- .thenReturn("mock://tempLocation/output");
- when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))
- .thenReturn(extractJob);*/
- Assert.assertThat(
- SourceTestUtils.readFromSource(bqSource, options),
- CoreMatchers.is(expected));
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options.setTempLocation(baseDir.toString());
+ List<TableRow> read = convertBigDecimaslToLong(
+ SourceTestUtils.readFromSource(bqSource, options));
+ assertThat(read, containsInAnyOrder(Iterables.toArray(expected, TableRow.class)));
SourceTestUtils.assertSplitAtFractionBehavior(
bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
- assertEquals(1, sources.size());
+ assertEquals(2, sources.size());
BoundedSource<TableRow> actual = sources.get(0);
assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
-
- /*
- Mockito.verify(Service)
- .startQueryJob(
- Mockito.<JobReference>any(), Mockito.<JobConfigurationQuery>any());
- Mockito.verify(mockJobService)
- .startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any());
- Mockito.verify(mockDatasetService)
- .createDataset(anyString(), anyString(), anyString(), anyString());
- ArgumentCaptor<JobConfigurationQuery> queryConfigArg =
- ArgumentCaptor.forClass(JobConfigurationQuery.class);
- Mockito.verify(mockJobService).dryRunQuery(anyString(), queryConfigArg.capture());
- assertEquals(true, queryConfigArg.getValue().getFlattenResults());
- assertEquals(true, queryConfigArg.getValue().getUseLegacySql());*/
}
@Test
@@ -1604,12 +1580,27 @@ public class BigQueryIOTest implements Serializable {
throws Exception {
p.enableAbandonedNodeEnforcement(false);
+ // In the case where a static destination is specified (i.e. not through a dynamic table
+ // function) and there is no input data, WritePartition will generate an empty table. This
+ // code is to test that path.
+ TableReference singletonReference = new TableReference()
+ .setProjectId("projectid")
+ .setDatasetId("dataset")
+ .setTableId("table");
+ String singletonDescription = "singleton";
+ boolean isSingleton = numTables == 1 && numFilesPerTable == 0;
+
List<ShardedKey<TableDestination>> expectedPartitions = Lists.newArrayList();
- for (int i = 0; i < numTables; ++i) {
- for (int j = 1; j <= expectedNumPartitionsPerTable; ++j) {
- String tableName = String.format("project-id:dataset-id.tables%05d", i);
- TableDestination destination = new TableDestination(tableName, tableName);
- expectedPartitions.add(ShardedKey.of(destination, j));
+ if (isSingleton) {
+ expectedPartitions.add(ShardedKey.of(
+ new TableDestination(singletonReference, singletonDescription), 1));
+ } else {
+ for (int i = 0; i < numTables; ++i) {
+ for (int j = 1; j <= expectedNumPartitionsPerTable; ++j) {
+ String tableName = String.format("project-id:dataset-id.tables%05d", i);
+ TableDestination destination = new TableDestination(tableName, tableName);
+ expectedPartitions.add(ShardedKey.of(destination, j));
+ }
}
}
@@ -1642,11 +1633,7 @@ public class BigQueryIOTest implements Serializable {
WriteBundlesToFiles.ResultCoder.of());
ValueProvider<String> singletonTable = null;
- if (numFilesPerTable == 0 && numTables == 1) {
- TableReference singletonReference = new TableReference()
- .setProjectId("projectid")
- .setDatasetId("dataset")
- .setTableId("table");
+ if (isSingleton) {
singletonTable = StaticValueProvider.of(BigQueryHelpers.toJsonString(singletonReference));
}
WritePartition writePartition =
@@ -1680,12 +1667,10 @@ public class BigQueryIOTest implements Serializable {
tableFilesResult.addAll(partition.getValue());
}
- assertEquals(expectedPartitions.size(), partitionsResult.size());
+ assertThat(partitionsResult,
+ containsInAnyOrder(Iterables.toArray(expectedPartitions, ShardedKey.class)));
- // assertThat(partitionsResult,
- // containsInAnyOrder(Iterables.toArray(expectedPartitions, ShardedKey.class)));
-
- if (numFilesPerTable == 0 && numTables == 1) {
+ if (isSingleton) {
assertEquals(1, filesPerTableResult.size());
List<String> singletonFiles = filesPerTableResult.values().iterator().next();
assertTrue(Files.exists(Paths.get(singletonFiles.get(0))));
@@ -1700,15 +1685,11 @@ public class BigQueryIOTest implements Serializable {
public void testWriteTables() throws Exception {
p.enableAbandonedNodeEnforcement(false);
+ FakeDatasetService datasetService = new FakeDatasetService();
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
.withJobService(new FakeJobService())
- // .startJobReturns("done", "done", "done", "done", "done", "done", "done", "done",
- // "done", "done")
- // .pollJobReturns(Status.FAILED, Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED,
- // Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED,
- // Status.SUCCEEDED, Status.SUCCEEDED))
- .withDatasetService(mockDatasetService);
-
+ .withDatasetService(datasetService);
+ datasetService.createDataset("project-id", "dataset-id", "", "");
long numTables = 3;
long numPartitions = 3;
long numFilesPerPartition = 10;
@@ -1716,6 +1697,8 @@ public class BigQueryIOTest implements Serializable {
String tempFilePrefix = "tempFilePrefix";
Map<TableDestination, List<String>> expectedTempTables = Maps.newHashMap();
+ Path baseDir = Files.createTempDirectory(tempFolder, "testWriteTables");
+
List<KV<ShardedKey<TableDestination>, Iterable<List<String>>>> partitions =
Lists.newArrayList();
for (int i = 0; i < numTables; ++i) {
@@ -1726,7 +1709,16 @@ public class BigQueryIOTest implements Serializable {
jobIdToken + "_0x%08x_%05d", tableDestination.hashCode(), j);
List<String> filesPerPartition = Lists.newArrayList();
for (int k = 0; k < numFilesPerPartition; ++k) {
- filesPerPartition.add(String.format("files0x%08x_%05d", tableDestination.hashCode(), k));
+ String filename = Paths.get(baseDir.toString(),
+ String.format("files0x%08x_%05d", tempTableId.hashCode(), k)).toString();
+ try (WritableByteChannel channel = IOChannelUtils.create(filename, MimeTypes.TEXT)) {
+ try (OutputStream output = Channels.newOutputStream(channel)) {
+ TableRow tableRow = new TableRow().set("name", tableName);
+ TableRowJsonCoder.of().encode(tableRow, output, Context.OUTER);
+ output.write("\n".getBytes(StandardCharsets.UTF_8));
+ }
+ }
+ filesPerPartition.add(filename);
}
partitions.add(KV.of(ShardedKey.of(tableDestination, j),
(Iterable<List<String>>) Collections.singleton(filesPerPartition)));
@@ -1814,25 +1806,45 @@ public class BigQueryIOTest implements Serializable {
public void testWriteRename() throws Exception {
p.enableAbandonedNodeEnforcement(false);
+ FakeDatasetService datasetService = new FakeDatasetService();
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
.withJobService(new FakeJobService())
- // .startJobReturns("done", "done")
- // .pollJobReturns(Status.FAILED, Status.SUCCEEDED))
- .withDatasetService(mockDatasetService);
+ .withDatasetService(datasetService);
+ datasetService.createDataset("project-id", "dataset-id", "", "");
- int numFinalTables = 3;
- int numTempTables = 3;
+ final int numFinalTables = 3;
+ final int numTempTablesPerFinalTable = 3;
+ final int numRecordsPerTempTable = 10;
+
+ Map<TableDestination, List<TableRow>> expectedRowsPerTable = Maps.newHashMap();
String jobIdToken = "jobIdToken";
- String jsonTable = "{}";
Map<TableDestination, Iterable<String>> tempTables = Maps.newHashMap();
for (int i = 0; i < numFinalTables; ++i) {
String tableName = "project-id:dataset-id.table_" + i;
- TableDestination tableDestination = new TableDestination(tableName, tableName);
+ TableDestination tableDestination = new TableDestination(
+ tableName, "table_" + i + "_desc");
List<String> tables = Lists.newArrayList();
tempTables.put(tableDestination, tables);
- for (int j = 0; i < numTempTables; ++i) {
- tables.add(String.format(
- "{\"project-id:dataset-id.tableId\":\"%s_%05d_%05d\"}", jobIdToken, i, j));
+
+ List<TableRow> expectedRows = expectedRowsPerTable.get(tableDestination);
+ if (expectedRows == null) {
+ expectedRows = Lists.newArrayList();
+ expectedRowsPerTable.put(tableDestination, expectedRows);
+ }
+ for (int j = 0; i < numTempTablesPerFinalTable; ++i) {
+ TableReference tempTable = new TableReference()
+ .setProjectId("project-id")
+ .setDatasetId("dataset-id")
+ .setTableId(String.format("%s_%05d_%05d", jobIdToken, i, j));
+ datasetService.createTable(new Table().setTableReference(tempTable));
+
+ List<TableRow> rows = Lists.newArrayList();
+ for (int k = 0; k < numRecordsPerTempTable; ++k) {
+ rows.add(new TableRow().set("number", j * numTempTablesPerFinalTable + k));
+ }
+ datasetService.insertAll(tempTable, rows, null);
+ expectedRows.addAll(rows);
+ tables.add(BigQueryHelpers.toJsonString(tempTable));
}
}
@@ -1857,37 +1869,52 @@ public class BigQueryIOTest implements Serializable {
tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables);
tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken);
tester.processElement(null);
+
+ for (Map.Entry<TableDestination, Iterable<String>> entry : tempTables.entrySet()) {
+ TableDestination tableDestination = entry.getKey();
+ TableReference tableReference = tableDestination.getTableReference();
+ Table table = checkNotNull(datasetService.getTable(tableReference));
+ assertEquals(tableReference.getTableId() + "_desc", tableDestination.getTableDescription());
+
+ List<TableRow> expectedRows = expectedRowsPerTable.get(tableDestination);
+ assertThat(datasetService.getAllRows(tableReference.getProjectId(),
+ tableReference.getDatasetId(), tableReference.getTableId()),
+ containsInAnyOrder(Iterables.toArray(expectedRows, TableRow.class)));
+
+ // Temp tables should be deleted.
+ for (String tempTableJson : entry.getValue()) {
+ TableReference tempTable = BigQueryHelpers.fromJsonString(
+ tempTableJson, TableReference.class);
+ assertEquals(null, datasetService.getTable(tempTable));
+ }
+ }
}
@Test
public void testRemoveTemporaryTables() throws Exception {
- String projectId = "someproject";
- String datasetId = "somedataset";
- List<String> tables = Lists.newArrayList("table1", "table2", "table3");
+ FakeDatasetService datasetService = new FakeDatasetService();
+ String projectId = "project";
+ String datasetId = "dataset";
+ datasetService.createDataset(projectId, datasetId, "", "");
List<TableReference> tableRefs = Lists.newArrayList(
- BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId,
- tables.get(0))),
- BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId,
- tables.get(1))),
- BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId,
- tables.get(2))));
+ BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, "table1")),
+ BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, "table2")),
+ BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, "table3")));
+ for (TableReference tableRef : tableRefs) {
+ datasetService.createTable(new Table().setTableReference(tableRef));
+ }
- doThrow(new IOException("Unable to delete table"))
- .when(mockDatasetService).deleteTable(tableRefs.get(0));
- doNothing().when(mockDatasetService).deleteTable(tableRefs.get(1));
- doNothing().when(mockDatasetService).deleteTable(tableRefs.get(2));
+ // Add one more table to delete that does not actually exist.
+ tableRefs.add(
+ BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, "table4")));
- WriteRename.removeTemporaryTables(mockDatasetService, tableRefs);
+ WriteRename.removeTemporaryTables(datasetService, tableRefs);
for (TableReference ref : tableRefs) {
loggedWriteRename.verifyDebug("Deleting table " + toJsonString(ref));
+ checkState(datasetService.getTable(ref) == null,
+ "Table " + ref + " was not deleted!");
}
- loggedWriteRename.verifyWarn("Failed to delete the table "
- + toJsonString(tableRefs.get(0)));
- loggedWriteRename.verifyNotLogged("Failed to delete the table "
- + toJsonString(tableRefs.get(1)));
- loggedWriteRename.verifyNotLogged("Failed to delete the table "
- + toJsonString(tableRefs.get(2)));
}
/** Test options. **/
@@ -1957,43 +1984,6 @@ public class BigQueryIOTest implements Serializable {
}}).length);
}
- private class WriteExtractFiles implements SerializableFunction<GenericJson, Void> {
- private final SerializableFunction<Void, Schema> schemaGenerator;
- private final Collection<Map<String, Object>> records;
-
- private WriteExtractFiles(
- SerializableFunction<Void, Schema> schemaGenerator,
- Collection<Map<String, Object>> records) {
- this.schemaGenerator = schemaGenerator;
- this.records = records;
- }
-
- @Override
- public Void apply(GenericJson input) {
- List<String> destinations = (List<String>) input.get("destinationUris");
- for (String destination : destinations) {
- String newDest = destination.replace("*", "000000000000");
- Schema schema = schemaGenerator.apply(null);
- try (WritableByteChannel channel = IOChannelUtils.create(newDest, MimeTypes.BINARY);
- DataFileWriter<GenericRecord> tableRowWriter =
- new DataFileWriter<>(new GenericDatumWriter<GenericRecord>(schema))
- .create(schema, Channels.newOutputStream(channel))) {
- for (Map<String, Object> record : records) {
- GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(schema);
- for (Map.Entry<String, Object> field : record.entrySet()) {
- genericRecordBuilder.set(field.getKey(), field.getValue());
- }
- tableRowWriter.append(genericRecordBuilder.build());
- }
- } catch (IOException e) {
- throw new IllegalStateException(
- String.format("Could not create destination for extract job %s", destination), e);
- }
- }
- return null;
- }
- }
-
@Test
public void testShardedKeyCoderIsSerializableWithWellKnownCoderType() {
CoderProperties.coderSerializable(ShardedKeyCoder.of(GlobalWindow.Coder.INSTANCE));
@@ -2013,4 +2003,19 @@ public class BigQueryIOTest implements Serializable {
TableRowInfoCoder.of()),
IntervalWindow.getCoder()));
}
+
+ List<TableRow> convertBigDecimaslToLong(List<TableRow> toConvert) {
+ // The numbers come back as BigDecimal objects after JSON serialization. Change them back to
+ // longs so that we can assert the output.
+ List<TableRow> converted = Lists.newArrayList();
+ for (TableRow entry : toConvert) {
+ TableRow convertedEntry = entry.clone();
+ Object num = convertedEntry.get("number");
+ if (num instanceof BigDecimal) {
+ convertedEntry.set("number", ((BigDecimal) num).longValue());
+ }
+ converted.add(convertedEntry);
+ }
+ return converted;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
index ed3ab37..6dfd9d7 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
@@ -1,39 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.beam.sdk.io.gcp.bigquery;
-import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString;
import static org.junit.Assert.assertEquals;
+import com.google.api.client.util.Base64;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.collect.Lists;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.List;
import java.util.NoSuchElementException;
+
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.TableRowJsonCoder;
import org.apache.beam.sdk.options.BigQueryOptions;
/**
- * Created by relax on 3/30/17.
+ * A fake implementation of BigQuery's query service..
*/
class FakeBigQueryServices implements BigQueryServices {
- private String[] jsonTableRowReturns = new String[0];
private JobService jobService;
- private DatasetService datasetService;
+ private FakeDatasetService datasetService;
- public FakeBigQueryServices withJobService(JobService jobService) {
+ FakeBigQueryServices withJobService(JobService jobService) {
this.jobService = jobService;
return this;
}
- public FakeBigQueryServices withDatasetService(DatasetService datasetService) {
+ FakeBigQueryServices withDatasetService(FakeDatasetService datasetService) {
this.datasetService = datasetService;
return this;
}
- public FakeBigQueryServices readerReturns(String... jsonTableRowReturns) {
- this.jsonTableRowReturns = jsonTableRowReturns;
- return this;
- }
-
@Override
public JobService getJobService(BigQueryOptions bqOptions) {
return jobService;
@@ -45,26 +65,58 @@ class FakeBigQueryServices implements BigQueryServices {
}
@Override
- public BigQueryJsonReader getReaderFromTable(
- BigQueryOptions bqOptions, TableReference tableRef) {
- return new FakeBigQueryReader(jsonTableRowReturns);
+ public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableReference tableRef) {
+ try {
+ List<TableRow> rows = datasetService.getAllRows(
+ tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId());
+ return new FakeBigQueryReader(rows);
+ } catch (Exception e) {
+ return null;
+ }
}
@Override
public BigQueryJsonReader getReaderFromQuery(
BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) {
- return new FakeBigQueryReader(jsonTableRowReturns);
+ try {
+ List<TableRow> rows = rowsFromEncodedQuery(queryConfig.getQuery());
+ return new FakeBigQueryReader(rows);
+ } catch (IOException e) {
+ return null;
+ }
+ }
+
+ static List<TableRow> rowsFromEncodedQuery(String query) throws IOException {
+ ListCoder<TableRow> listCoder = ListCoder.of(TableRowJsonCoder.of());
+ ByteArrayInputStream input = new ByteArrayInputStream(Base64.decodeBase64(query));
+ List<TableRow> rows = listCoder.decode(input, Context.OUTER);
+ for (TableRow row : rows) {
+ convertNumbers(row);
+ }
+ return rows;
+ }
+
+ static String encodeQuery(List<TableRow> rows) throws IOException {
+ ListCoder<TableRow> listCoder = ListCoder.of(TableRowJsonCoder.of());
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ listCoder.encode(rows, output, Context.OUTER);
+ return Base64.encodeBase64String(output.toByteArray());
}
private static class FakeBigQueryReader implements BigQueryJsonReader {
private static final int UNSTARTED = -1;
private static final int CLOSED = Integer.MAX_VALUE;
- private String[] jsonTableRowReturns;
+ private List<byte[]> serializedTableRowReturns;
private int currIndex;
- FakeBigQueryReader(String[] jsonTableRowReturns) {
- this.jsonTableRowReturns = jsonTableRowReturns;
+ FakeBigQueryReader(List<TableRow> tableRowReturns) throws IOException {
+ this.serializedTableRowReturns = Lists.newArrayListWithExpectedSize(tableRowReturns.size());
+ for (TableRow tableRow : tableRowReturns) {
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ TableRowJsonCoder.of().encode(tableRow, output, Context.OUTER);
+ serializedTableRowReturns.add(output.toByteArray());
+ }
this.currIndex = UNSTARTED;
}
@@ -72,20 +124,27 @@ class FakeBigQueryServices implements BigQueryServices {
public boolean start() throws IOException {
assertEquals(UNSTARTED, currIndex);
currIndex = 0;
- return currIndex < jsonTableRowReturns.length;
+ return currIndex < serializedTableRowReturns.size();
}
@Override
public boolean advance() throws IOException {
- return ++currIndex < jsonTableRowReturns.length;
+ return ++currIndex < serializedTableRowReturns.size();
}
@Override
public TableRow getCurrent() throws NoSuchElementException {
- if (currIndex >= jsonTableRowReturns.length) {
+ if (currIndex >= serializedTableRowReturns.size()) {
throw new NoSuchElementException();
}
- return fromJsonString(jsonTableRowReturns[currIndex], TableRow.class);
+
+ ByteArrayInputStream input = new ByteArrayInputStream(
+ serializedTableRowReturns.get(currIndex));
+ try {
+ return convertNumbers(TableRowJsonCoder.of().decode(input, Context.OUTER));
+ } catch (IOException e) {
+ return null;
+ }
}
@Override
@@ -93,4 +152,15 @@ class FakeBigQueryServices implements BigQueryServices {
currIndex = CLOSED;
}
}
+
+
+ // Longs tend to get converted back to Integers due to JSON serialization. Convert them back.
+ static TableRow convertNumbers(TableRow tableRow) {
+ for (TableRow.Entry entry : tableRow.entrySet()) {
+ if (entry.getValue() instanceof Integer) {
+ entry.setValue(new Long((Integer) entry.getValue()));
+ }
+ }
+ return tableRow;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
index 9b2cf63..5103adb 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
@@ -1,9 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.beam.sdk.io.gcp.bigquery;
-import static com.google.common.base.Preconditions.checkNotNull;
import static org.junit.Assert.assertEquals;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.HttpHeaders;
import com.google.api.services.bigquery.model.Dataset;
+import com.google.api.services.bigquery.model.DatasetReference;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
@@ -24,13 +44,13 @@ class FakeDatasetService implements DatasetService, Serializable {
throws InterruptedException, IOException {
synchronized (BigQueryIOTest.tables) {
Map<String, TableContainer> dataset =
- checkNotNull(
- BigQueryIOTest.tables.get(tableRef.getProjectId(), tableRef.getDatasetId()),
- "Tried to get a dataset %s:%s from %s, but no such dataset was set",
- tableRef.getProjectId(),
- tableRef.getDatasetId(),
- tableRef.getTableId(),
- FakeDatasetService.class.getSimpleName());
+ BigQueryIOTest.tables.get(tableRef.getProjectId(), tableRef.getDatasetId());
+ if (dataset == null) {
+ throwNotFound(
+ "Tried to get a dataset %s:%s from, but no such dataset was set",
+ tableRef.getProjectId(),
+ tableRef.getDatasetId());
+ }
TableContainer tableContainer = dataset.get(tableRef.getTableId());
return tableContainer == null ? null : tableContainer.getTable();
}
@@ -44,27 +64,40 @@ class FakeDatasetService implements DatasetService, Serializable {
}
private TableContainer getTableContainer(String projectId, String datasetId, String tableId)
- throws InterruptedException, IOException {
- synchronized (BigQueryIOTest.tables) {
- Map<String, TableContainer> dataset =
- checkNotNull(
- BigQueryIOTest.tables.get(projectId, datasetId),
- "Tried to get a dataset %s:%s from %s, but no such dataset was set",
- projectId,
- datasetId,
- FakeDatasetService.class.getSimpleName());
- return checkNotNull(dataset.get(tableId),
- "Tried to get a table %s:%s.%s from %s, but no such table was set",
- projectId,
- datasetId,
- tableId,
- FakeDatasetService.class.getSimpleName());
- }
+ throws InterruptedException, IOException {
+ synchronized (BigQueryIOTest.tables) {
+ Map<String, TableContainer> dataset = BigQueryIOTest.tables.get(projectId, datasetId);
+ if (dataset == null) {
+ throwNotFound(
+ "Tried to get a dataset %s:%s, but no such dataset was set",
+ projectId,
+ datasetId);
+ }
+ TableContainer tableContainer = dataset.get(tableId);
+ if (tableContainer == null) {
+ throwNotFound(
+ "Tried to get a table %s:%s.%s, but no such table was set",
+ projectId,
+ datasetId,
+ tableId);
+ }
+ return tableContainer;
+ }
}
@Override
public void deleteTable(TableReference tableRef) throws IOException, InterruptedException {
- throw new UnsupportedOperationException("Unsupported");
+ synchronized (BigQueryIOTest.tables) {
+ Map<String, TableContainer> dataset =
+ BigQueryIOTest.tables.get(tableRef.getProjectId(), tableRef.getDatasetId());
+ if (dataset == null) {
+ throwNotFound(
+ "Tried to get a dataset %s:%s, but no such table was set",
+ tableRef.getProjectId(),
+ tableRef.getDatasetId());
+ }
+ dataset.remove(tableRef.getTableId());
+ }
}
@@ -73,13 +106,13 @@ class FakeDatasetService implements DatasetService, Serializable {
TableReference tableReference = table.getTableReference();
synchronized (BigQueryIOTest.tables) {
Map<String, TableContainer> dataset =
- checkNotNull(
- BigQueryIOTest.tables.get(tableReference.getProjectId(),
- tableReference.getDatasetId()),
- "Tried to get a dataset %s:%s from %s, but no such table was set",
- tableReference.getProjectId(),
- tableReference.getDatasetId(),
- FakeDatasetService.class.getSimpleName());
+ BigQueryIOTest.tables.get(tableReference.getProjectId(), tableReference.getDatasetId());
+ if (dataset == null) {
+ throwNotFound(
+ "Tried to get a dataset %s:%s, but no such table was set",
+ tableReference.getProjectId(),
+ tableReference.getDatasetId());
+ }
TableContainer tableContainer = dataset.get(tableReference.getTableId());
if (tableContainer == null) {
tableContainer = new TableContainer(table);
@@ -98,7 +131,16 @@ class FakeDatasetService implements DatasetService, Serializable {
@Override
public Dataset getDataset(
String projectId, String datasetId) throws IOException, InterruptedException {
- throw new UnsupportedOperationException("Unsupported");
+ synchronized (BigQueryIOTest.tables) {
+ Map<String, TableContainer> dataset = BigQueryIOTest.tables.get(projectId, datasetId);
+ if (dataset == null) {
+ throwNotFound("Tried to get a dataset %s:%s, but no such table was set",
+ projectId, datasetId);
+ }
+ return new Dataset().setDatasetReference(new DatasetReference()
+ .setDatasetId(datasetId)
+ .setProjectId(projectId));
+ }
}
@Override
@@ -117,7 +159,9 @@ class FakeDatasetService implements DatasetService, Serializable {
@Override
public void deleteDataset(String projectId, String datasetId)
throws IOException, InterruptedException {
- throw new UnsupportedOperationException("Unsupported");
+ synchronized (BigQueryIOTest.tables) {
+ BigQueryIOTest.tables.remove(projectId, datasetId);
+ }
}
@Override
@@ -138,8 +182,7 @@ class FakeDatasetService implements DatasetService, Serializable {
TableContainer tableContainer = getTableContainer(
ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
for (int i = 0; i < rowList.size(); ++i) {
- tableContainer.addRow(rowList.get(i), insertIdList.get(i));
- dataSize += rowList.get(i).toString().length();
+ dataSize += tableContainer.addRow(rowList.get(i), insertIdList.get(i));
}
return dataSize;
}
@@ -150,23 +193,16 @@ class FakeDatasetService implements DatasetService, Serializable {
@Nullable String tableDescription)
throws IOException, InterruptedException {
synchronized (BigQueryIOTest.tables) {
- Map<String, TableContainer> dataset =
- checkNotNull(
- BigQueryIOTest.tables.get(tableReference.getProjectId(),
- tableReference.getDatasetId()),
- "Tried to get a dataset %s:%s from %s, but no such dataset was set",
- tableReference.getProjectId(),
- tableReference.getDatasetId(),
- tableReference.getTableId(),
- FakeDatasetService.class.getSimpleName());
- TableContainer tableContainer = checkNotNull(dataset.get(tableReference.getTableId()),
- "Tried to patch a table %s:%s.%s from %s, but no such table was set",
- tableReference.getProjectId(),
- tableReference.getDatasetId(),
- tableReference.getTableId(),
- FakeDatasetService.class.getSimpleName());
+ TableContainer tableContainer = getTableContainer(tableReference.getProjectId(),
+ tableReference.getDatasetId(), tableReference.getTableId());
tableContainer.getTable().setDescription(tableDescription);
return tableContainer.getTable();
}
}
+
+ void throwNotFound(String format, Object... args) throws IOException {
+ throw new IOException(
+ new GoogleJsonResponseException.Builder(404,
+ String.format(format, args), new HttpHeaders()).build());
+ }
}