You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/12/29 05:10:30 UTC
[1/3] beam git commit: [BEAM-1022] Add testing coverage for BigQuery
streaming writes
Repository: beam
Updated Branches:
refs/heads/release-0.4.0 482016037 -> e77080a4d
[BEAM-1022] Add testing coverage for BigQuery streaming writes
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c53a332e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c53a332e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c53a332e
Branch: refs/heads/release-0.4.0
Commit: c53a332e23bc6a6441a0d52c57c9fbe5e00171b5
Parents: 4820160
Author: Reuven Lax <re...@google.com>
Authored: Thu Nov 17 10:57:41 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Dec 28 14:22:11 2016 -0800
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 48 +-
.../sdk/io/gcp/bigquery/BigQueryServices.java | 7 +-
.../io/gcp/bigquery/BigQueryServicesImpl.java | 121 ++++-
.../io/gcp/bigquery/BigQueryTableInserter.java | 217 ---------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 456 +++++++++++++++----
.../gcp/bigquery/BigQueryServicesImplTest.java | 139 +++++-
.../gcp/bigquery/BigQueryTableInserterTest.java | 245 ----------
.../sdk/io/gcp/bigquery/BigQueryUtilTest.java | 50 +-
8 files changed, 655 insertions(+), 628 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c53a332e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 0be8567..28049ed 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -24,7 +24,6 @@ import static com.google.common.base.Preconditions.checkState;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.json.JsonFactory;
-import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationExtract;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
@@ -33,6 +32,7 @@ import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.JobStatus;
+import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
@@ -1796,8 +1796,8 @@ public class BigQueryIO {
* <p>Does not modify this object.
*/
public Bound withCreateDisposition(CreateDisposition createDisposition) {
- return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, validate, bigQueryServices);
+ return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema,
+ createDisposition, writeDisposition, validate, bigQueryServices);
}
/**
@@ -1806,8 +1806,8 @@ public class BigQueryIO {
* <p>Does not modify this object.
*/
public Bound withWriteDisposition(WriteDisposition writeDisposition) {
- return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, validate, bigQueryServices);
+ return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema,
+ createDisposition, writeDisposition, validate, bigQueryServices);
}
/**
@@ -2136,7 +2136,8 @@ public class BigQueryIO {
/** Returns the table reference, or {@code null}. */
@Nullable
public ValueProvider<TableReference> getTable() {
- return NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
+ return jsonTableRef == null ? null :
+ NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
}
/** Returns {@code true} if table validation is enabled. */
@@ -2550,6 +2551,13 @@ public class BigQueryIO {
}
}
+ /**
+ * Clear the cached map of created tables. Used for testing.
+ */
+ @VisibleForTesting
+ static void clearCreatedTables() {
+ StreamingWriteFn.clearCreatedTables();
+ }
/////////////////////////////////////////////////////////////////////////////
/**
@@ -2585,6 +2593,15 @@ public class BigQueryIO {
this.bqServices = checkNotNull(bqServices, "bqServices");
}
+ /**
+ * Clear the cached map of created tables. Used for testing.
+ */
+ private static void clearCreatedTables() {
+ synchronized (createdTables) {
+ createdTables.clear();
+ }
+ }
+
/** Prepares a target BigQuery table. */
@StartBundle
public void startBundle(Context context) {
@@ -2626,20 +2643,25 @@ public class BigQueryIO {
}
public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
- throws IOException {
+ throws InterruptedException, IOException {
TableReference tableReference = parseTableSpec(tableSpec);
if (!createdTables.contains(tableSpec)) {
synchronized (createdTables) {
// Another thread may have succeeded in creating the table in the meanwhile, so
// check again. This check isn't needed for correctness, but we add it to prevent
// every thread from attempting a create and overwhelming our BigQuery quota.
+ DatasetService datasetService = bqServices.getDatasetService(options);
if (!createdTables.contains(tableSpec)) {
- TableSchema tableSchema = JSON_FACTORY.fromString(
- jsonTableSchema.get(), TableSchema.class);
- Bigquery client = Transport.newBigQueryClient(options).build();
- BigQueryTableInserter inserter = new BigQueryTableInserter(client, options);
- inserter.getOrCreateTable(tableReference, Write.WriteDisposition.WRITE_APPEND,
- Write.CreateDisposition.CREATE_IF_NEEDED, tableSchema);
+ Table table = datasetService.getTable(
+ tableReference.getProjectId(),
+ tableReference.getDatasetId(),
+ tableReference.getTableId());
+ if (table == null) {
+ TableSchema tableSchema = JSON_FACTORY.fromString(
+ jsonTableSchema.get(), TableSchema.class);
+ datasetService.createTable(
+ new Table().setTableReference(tableReference).setSchema(tableSchema));
+ }
createdTables.add(tableSpec);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c53a332e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 07dc06e..8ca473d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -114,12 +114,17 @@ interface BigQueryServices extends Serializable {
*/
interface DatasetService {
/**
- * Gets the specified {@link Table} resource by table ID.
+ * Gets the specified {@link Table} resource by table ID or {@code null} if no table exists.
*/
Table getTable(String projectId, String datasetId, String tableId)
throws InterruptedException, IOException;
/**
+ * Creates the specified table if it does not exist.
+ */
+ void createTable(Table table) throws InterruptedException, IOException;
+
+ /**
* Deletes the table specified by tableId from the dataset.
* If the table contains data, all the data will be deleted.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/c53a332e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 61f1a1a..4eb8e7b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -23,6 +23,7 @@ import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.Dataset;
@@ -53,10 +54,12 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Transport;
import org.joda.time.Duration;
@@ -281,7 +284,8 @@ class BigQueryServicesImpl implements BigQueryServices {
"Unable to dry run query: %s, aborting after %d retries.",
queryConfig, MAX_RPC_RETRIES),
Sleeper.DEFAULT,
- backoff).getStatistics();
+ backoff,
+ ALWAYS_RETRY).getStatistics();
}
/**
@@ -400,7 +404,80 @@ class BigQueryServicesImpl implements BigQueryServices {
"Unable to get table: %s, aborting after %d retries.",
tableId, MAX_RPC_RETRIES),
Sleeper.DEFAULT,
- backoff);
+ backoff,
+ DONT_RETRY_NOT_FOUND);
+ }
+
+ /**
+ * Retry table creation up to 5 minutes (with exponential backoff) when this user is near the
+ * quota for table creation. This relatively innocuous behavior can happen when BigQueryIO is
+ * configured with a table spec function to use different tables for each window.
+ */
+ private static final int RETRY_CREATE_TABLE_DURATION_MILLIS =
+ (int) TimeUnit.MINUTES.toMillis(5);
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p>If a table with the same name already exists in the dataset, the function simply
+ * returns. In such a case,
+ * the existing table doesn't necessarily have the same schema as specified
+ * by the parameter.
+ *
+ * @throws IOException if other error than already existing table occurs.
+ */
+ @Override
+ public void createTable(Table table) throws InterruptedException, IOException {
+ LOG.info("Trying to create BigQuery table: {}",
+ BigQueryIO.toTableSpec(table.getTableReference()));
+ BackOff backoff =
+ new ExponentialBackOff.Builder()
+ .setMaxElapsedTimeMillis(RETRY_CREATE_TABLE_DURATION_MILLIS)
+ .build();
+
+ tryCreateTable(table, backoff, Sleeper.DEFAULT);
+ }
+
+ @VisibleForTesting
+ @Nullable
+ Table tryCreateTable(Table table, BackOff backoff, Sleeper sleeper)
+ throws IOException {
+ boolean retry = false;
+ while (true) {
+ try {
+ return client.tables().insert(
+ table.getTableReference().getProjectId(),
+ table.getTableReference().getDatasetId(),
+ table).execute();
+ } catch (IOException e) {
+ ApiErrorExtractor extractor = new ApiErrorExtractor();
+ if (extractor.itemAlreadyExists(e)) {
+ // The table already exists, nothing to return.
+ return null;
+ } else if (extractor.rateLimited(e)) {
+ // The request failed because we hit a temporary quota. Back off and try again.
+ try {
+ if (BackOffUtils.next(sleeper, backoff)) {
+ if (!retry) {
+ LOG.info(
+ "Quota limit reached when creating table {}:{}.{}, retrying up to {} minutes",
+ table.getTableReference().getProjectId(),
+ table.getTableReference().getDatasetId(),
+ table.getTableReference().getTableId(),
+ TimeUnit.MILLISECONDS.toSeconds(RETRY_CREATE_TABLE_DURATION_MILLIS) / 60.0);
+ retry = true;
+ }
+ continue;
+ }
+ } catch (InterruptedException e1) {
+ // Restore interrupted state and throw the last failure.
+ Thread.currentThread().interrupt();
+ throw e;
+ }
+ }
+ throw e;
+ }
+ }
}
/**
@@ -422,7 +499,8 @@ class BigQueryServicesImpl implements BigQueryServices {
"Unable to delete table: %s, aborting after %d retries.",
tableId, MAX_RPC_RETRIES),
Sleeper.DEFAULT,
- backoff);
+ backoff,
+ ALWAYS_RETRY);
}
@Override
@@ -437,7 +515,8 @@ class BigQueryServicesImpl implements BigQueryServices {
"Unable to list table data: %s, aborting after %d retries.",
tableId, MAX_RPC_RETRIES),
Sleeper.DEFAULT,
- backoff);
+ backoff,
+ ALWAYS_RETRY);
return dataList.getRows() == null || dataList.getRows().isEmpty();
}
@@ -460,7 +539,8 @@ class BigQueryServicesImpl implements BigQueryServices {
"Unable to get dataset: %s, aborting after %d retries.",
datasetId, MAX_RPC_RETRIES),
Sleeper.DEFAULT,
- backoff);
+ backoff,
+ DONT_RETRY_NOT_FOUND);
}
/**
@@ -543,7 +623,8 @@ class BigQueryServicesImpl implements BigQueryServices {
"Unable to delete table: %s, aborting after %d retries.",
datasetId, MAX_RPC_RETRIES),
Sleeper.DEFAULT,
- backoff);
+ backoff,
+ ALWAYS_RETRY);
}
@VisibleForTesting
@@ -684,8 +765,8 @@ class BigQueryServicesImpl implements BigQueryServices {
public long insertAll(
TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
throws IOException, InterruptedException {
- return insertAll(
- ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
+ return insertAll(
+ ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
}
}
@@ -747,12 +828,31 @@ class BigQueryServicesImpl implements BigQueryServices {
}
}
+ static final SerializableFunction<IOException, Boolean> DONT_RETRY_NOT_FOUND =
+ new SerializableFunction<IOException, Boolean>() {
+ @Override
+ public Boolean apply(IOException input) {
+ ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
+ return !errorExtractor.itemNotFound(input);
+ }
+ };
+
+ static final SerializableFunction<IOException, Boolean> ALWAYS_RETRY =
+ new SerializableFunction<IOException, Boolean>() {
+ @Override
+ public Boolean apply(IOException input) {
+ return true;
+ }
+ };
+
+
@VisibleForTesting
static <T> T executeWithRetries(
AbstractGoogleClientRequest<T> request,
String errorMessage,
Sleeper sleeper,
- BackOff backoff)
+ BackOff backoff,
+ SerializableFunction<IOException, Boolean> shouldRetry)
throws IOException, InterruptedException {
Exception lastException = null;
do {
@@ -761,6 +861,9 @@ class BigQueryServicesImpl implements BigQueryServices {
} catch (IOException e) {
LOG.warn("Ignore the error and retry the request.", e);
lastException = e;
+ if (!shouldRetry.apply(e)) {
+ break;
+ }
}
} while (nextBackOff(sleeper, backoff));
throw new IOException(
http://git-wip-us.apache.org/repos/asf/beam/blob/c53a332e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java
deleted file mode 100644
index a64dc9f..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.ExponentialBackOff;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableDataList;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Inserts rows into BigQuery.
- */
-class BigQueryTableInserter {
- private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableInserter.class);
-
- private final Bigquery client;
-
- /**
- * Constructs a new row inserter.
- *
- * @param client a BigQuery client
- * @param options a PipelineOptions object
- */
- BigQueryTableInserter(Bigquery client, PipelineOptions options) {
- this.client = client;
- }
-
- /**
- * Retrieves or creates the table.
- *
- * <p>The table is checked to conform to insertion requirements as specified
- * by WriteDisposition and CreateDisposition.
- *
- * <p>If table truncation is requested (WriteDisposition.WRITE_TRUNCATE), then
- * this will re-create the table if necessary to ensure it is empty.
- *
- * <p>If an empty table is required (WriteDisposition.WRITE_EMPTY), then this
- * will fail if the table exists and is not empty.
- *
- * <p>When constructing a table, a {@code TableSchema} must be available. If a
- * schema is provided, then it will be used. If no schema is provided, but
- * an existing table is being cleared (WRITE_TRUNCATE option above), then
- * the existing schema will be re-used. If no schema is available, then an
- * {@code IOException} is thrown.
- */
- Table getOrCreateTable(
- TableReference ref,
- WriteDisposition writeDisposition,
- CreateDisposition createDisposition,
- @Nullable TableSchema schema) throws IOException {
- // Check if table already exists.
- Bigquery.Tables.Get get = client.tables()
- .get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
- Table table = null;
- try {
- table = get.execute();
- } catch (IOException e) {
- ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
- if (!errorExtractor.itemNotFound(e)
- || createDisposition != CreateDisposition.CREATE_IF_NEEDED) {
- // Rethrow.
- throw e;
- }
- }
-
- // If we want an empty table, and it isn't, then delete it first.
- if (table != null) {
- if (writeDisposition == WriteDisposition.WRITE_APPEND) {
- return table;
- }
-
- boolean empty = isEmpty(ref);
- if (empty) {
- if (writeDisposition == WriteDisposition.WRITE_TRUNCATE) {
- LOG.info("Empty table found, not removing {}", BigQueryIO.toTableSpec(ref));
- }
- return table;
-
- } else if (writeDisposition == WriteDisposition.WRITE_EMPTY) {
- throw new IOException("WriteDisposition is WRITE_EMPTY, "
- + "but table is not empty");
- }
-
- // Reuse the existing schema if none was provided.
- if (schema == null) {
- schema = table.getSchema();
- }
-
- // Delete table and fall through to re-creating it below.
- LOG.info("Deleting table {}", BigQueryIO.toTableSpec(ref));
- Bigquery.Tables.Delete delete = client.tables()
- .delete(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
- delete.execute();
- }
-
- if (schema == null) {
- throw new IllegalArgumentException(
- "Table schema required for new table.");
- }
-
- // Create the table.
- return tryCreateTable(ref, schema);
- }
-
- /**
- * Checks if a table is empty.
- */
- private boolean isEmpty(TableReference ref) throws IOException {
- Bigquery.Tabledata.List list = client.tabledata()
- .list(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
- list.setMaxResults(1L);
- TableDataList dataList = list.execute();
-
- return dataList.getRows() == null || dataList.getRows().isEmpty();
- }
-
- /**
- * Retry table creation up to 5 minutes (with exponential backoff) when this user is near the
- * quota for table creation. This relatively innocuous behavior can happen when BigQueryIO is
- * configured with a table spec function to use different tables for each window.
- */
- private static final int RETRY_CREATE_TABLE_DURATION_MILLIS = (int) TimeUnit.MINUTES.toMillis(5);
-
- /**
- * Tries to create the BigQuery table.
- * If a table with the same name already exists in the dataset, the table
- * creation fails, and the function returns null. In such a case,
- * the existing table doesn't necessarily have the same schema as specified
- * by the parameter.
- *
- * @param schema Schema of the new BigQuery table.
- * @return The newly created BigQuery table information, or null if the table
- * with the same name already exists.
- * @throws IOException if other error than already existing table occurs.
- */
- @Nullable
- private Table tryCreateTable(TableReference ref, TableSchema schema) throws IOException {
- LOG.info("Trying to create BigQuery table: {}", BigQueryIO.toTableSpec(ref));
- BackOff backoff =
- new ExponentialBackOff.Builder()
- .setMaxElapsedTimeMillis(RETRY_CREATE_TABLE_DURATION_MILLIS)
- .build();
-
- Table table = new Table().setTableReference(ref).setSchema(schema);
- return tryCreateTable(table, ref.getProjectId(), ref.getDatasetId(), backoff, Sleeper.DEFAULT);
- }
-
- @VisibleForTesting
- @Nullable
- Table tryCreateTable(
- Table table, String projectId, String datasetId, BackOff backoff, Sleeper sleeper)
- throws IOException {
- boolean retry = false;
- while (true) {
- try {
- return client.tables().insert(projectId, datasetId, table).execute();
- } catch (IOException e) {
- ApiErrorExtractor extractor = new ApiErrorExtractor();
- if (extractor.itemAlreadyExists(e)) {
- // The table already exists, nothing to return.
- return null;
- } else if (extractor.rateLimited(e)) {
- // The request failed because we hit a temporary quota. Back off and try again.
- try {
- if (BackOffUtils.next(sleeper, backoff)) {
- if (!retry) {
- LOG.info(
- "Quota limit reached when creating table {}:{}.{}, retrying up to {} minutes",
- projectId,
- datasetId,
- table.getTableReference().getTableId(),
- TimeUnit.MILLISECONDS.toSeconds(RETRY_CREATE_TABLE_DURATION_MILLIS) / 60.0);
- retry = true;
- }
- continue;
- }
- } catch (InterruptedException e1) {
- // Restore interrupted state and throw the last failure.
- Thread.currentThread().interrupt();
- throw e;
- }
- }
- throw e;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c53a332e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 54ec2bb..b78316f 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -23,11 +23,13 @@ import static com.google.common.base.Preconditions.checkState;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.fromJsonString;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.toJsonString;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
@@ -58,18 +60,20 @@ import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-import com.google.common.collect.Table.Cell;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
+import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.io.OutputStream;
import java.io.Serializable;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -85,6 +89,8 @@ import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -133,6 +139,9 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
@@ -146,6 +155,7 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
+import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
@@ -478,39 +488,81 @@ public class BigQueryIOTest implements Serializable {
}
}
- /** A fake dataset service that can be serialized, for use in testReadFromTable. */
- private static class FakeDatasetService implements DatasetService, Serializable {
- private com.google.common.collect.Table<String, String, Map<String, Table>> tables =
- HashBasedTable.create();
-
- public FakeDatasetService withTable(
- String projectId, String datasetId, String tableId, Table table) throws IOException {
- Map<String, Table> dataset = tables.get(projectId, datasetId);
- if (dataset == null) {
- dataset = new HashMap<>();
- tables.put(projectId, datasetId, dataset);
- }
- dataset.put(tableId, table);
+ private static class TableContainer {
+ Table table;
+ List<TableRow> rows;
+ List<String> ids;
+
+ TableContainer(Table table) {
+ this.table = table;
+ this.rows = new ArrayList<>();
+ this.ids = new ArrayList<>();
+ }
+
+ TableContainer addRow(TableRow row, String id) {
+ rows.add(row);
+ ids.add(id);
return this;
}
+ Table getTable() {
+ return table;
+ }
+
+ List<TableRow> getRows() {
+ return rows;
+ }
+ }
+
+ // Table information must be static, as each ParDo will get a separate instance of
+ // FakeDatasetServices, and they must all modify the same storage.
+ private static com.google.common.collect.Table<String, String, Map<String, TableContainer>>
+ tables = HashBasedTable.create();
+
+ /** A fake dataset service that can be serialized, for use in testReadFromTable. */
+ private static class FakeDatasetService implements DatasetService, Serializable {
+
@Override
public Table getTable(String projectId, String datasetId, String tableId)
throws InterruptedException, IOException {
- Map<String, Table> dataset =
- checkNotNull(
- tables.get(projectId, datasetId),
- "Tried to get a table %s:%s.%s from %s, but no such table was set",
- projectId,
- datasetId,
- tableId,
- FakeDatasetService.class.getSimpleName());
- return checkNotNull(dataset.get(tableId),
- "Tried to get a table %s:%s.%s from %s, but no such table was set",
- projectId,
- datasetId,
- tableId,
- FakeDatasetService.class.getSimpleName());
+ synchronized (tables) {
+ Map<String, TableContainer> dataset =
+ checkNotNull(
+ tables.get(projectId, datasetId),
+ "Tried to get a dataset %s:%s from %s, but no such dataset was set",
+ projectId,
+ datasetId,
+ tableId,
+ FakeDatasetService.class.getSimpleName());
+ TableContainer tableContainer = dataset.get(tableId);
+ return tableContainer == null ? null : tableContainer.getTable();
+ }
+ }
+
+ public List<TableRow> getAllRows(String projectId, String datasetId, String tableId)
+ throws InterruptedException, IOException {
+ synchronized (tables) {
+ return getTableContainer(projectId, datasetId, tableId).getRows();
+ }
+ }
+
+ private TableContainer getTableContainer(String projectId, String datasetId, String tableId)
+ throws InterruptedException, IOException {
+ synchronized (tables) {
+ Map<String, TableContainer> dataset =
+ checkNotNull(
+ tables.get(projectId, datasetId),
+ "Tried to get a dataset %s:%s from %s, but no such dataset was set",
+ projectId,
+ datasetId,
+ FakeDatasetService.class.getSimpleName());
+ return checkNotNull(dataset.get(tableId),
+ "Tried to get a table %s:%s.%s from %s, but no such table was set",
+ projectId,
+ datasetId,
+ tableId,
+ FakeDatasetService.class.getSimpleName());
+ }
}
@Override
@@ -519,6 +571,26 @@ public class BigQueryIOTest implements Serializable {
throw new UnsupportedOperationException("Unsupported");
}
+
+ @Override
+ public void createTable(Table table) throws IOException {
+ TableReference tableReference = table.getTableReference();
+ synchronized (tables) {
+ Map<String, TableContainer> dataset =
+ checkNotNull(
+ tables.get(tableReference.getProjectId(), tableReference.getDatasetId()),
+ "Tried to get a dataset %s:%s from %s, but no such table was set",
+ tableReference.getProjectId(),
+ tableReference.getDatasetId(),
+ FakeDatasetService.class.getSimpleName());
+ TableContainer tableContainer = dataset.get(tableReference.getTableId());
+ if (tableContainer == null) {
+ tableContainer = new TableContainer(table);
+ dataset.put(tableReference.getTableId(), tableContainer);
+ }
+ }
+ }
+
@Override
public boolean isTableEmpty(String projectId, String datasetId, String tableId)
throws IOException, InterruptedException {
@@ -536,7 +608,13 @@ public class BigQueryIOTest implements Serializable {
public void createDataset(
String projectId, String datasetId, String location, String description)
throws IOException, InterruptedException {
- throw new UnsupportedOperationException("Unsupported");
+ synchronized (tables) {
+ Map<String, TableContainer> dataset = tables.get(projectId, datasetId);
+ if (dataset == null) {
+ dataset = new HashMap<>();
+ tables.put(projectId, datasetId, dataset);
+ }
+ }
}
@Override
@@ -549,55 +627,18 @@ public class BigQueryIOTest implements Serializable {
public long insertAll(
TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
throws IOException, InterruptedException {
- throw new UnsupportedOperationException("Unsupported");
- }
-
- ////////////////////////////////// SERIALIZATION METHODS ////////////////////////////////////
- private void writeObject(ObjectOutputStream out) throws IOException {
- out.writeObject(replaceTablesWithBytes(this.tables));
- }
-
- private com.google.common.collect.Table<String, String, Map<String, byte[]>>
- replaceTablesWithBytes(
- com.google.common.collect.Table<String, String, Map<String, Table>> toCopy)
- throws IOException {
- com.google.common.collect.Table<String, String, Map<String, byte[]>> copy =
- HashBasedTable.create();
- for (Cell<String, String, Map<String, Table>> cell : toCopy.cellSet()) {
- HashMap<String, byte[]> dataset = new HashMap<>();
- copy.put(cell.getRowKey(), cell.getColumnKey(), dataset);
- for (Map.Entry<String, Table> dsTables : cell.getValue().entrySet()) {
- dataset.put(
- dsTables.getKey(), Transport.getJsonFactory().toByteArray(dsTables.getValue()));
+ synchronized (tables) {
+ assertEquals(rowList.size(), insertIdList.size());
+
+ long dataSize = 0;
+ TableContainer tableContainer = getTableContainer(
+ ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
+ for (int i = 0; i < rowList.size(); ++i) {
+ tableContainer.addRow(rowList.get(i), insertIdList.get(i));
+ dataSize += rowList.get(i).toString().length();
}
+ return dataSize;
}
- return copy;
- }
-
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- com.google.common.collect.Table<String, String, Map<String, byte[]>> tablesTable =
- (com.google.common.collect.Table<String, String, Map<String, byte[]>>) in.readObject();
- this.tables = replaceBytesWithTables(tablesTable);
- }
-
- private com.google.common.collect.Table<String, String, Map<String, Table>>
- replaceBytesWithTables(
- com.google.common.collect.Table<String, String, Map<String, byte[]>> tablesTable)
- throws IOException {
- com.google.common.collect.Table<String, String, Map<String, Table>> copy =
- HashBasedTable.create();
- for (Cell<String, String, Map<String, byte[]>> cell : tablesTable.cellSet()) {
- HashMap<String, Table> dataset = new HashMap<>();
- copy.put(cell.getRowKey(), cell.getColumnKey(), dataset);
- for (Map.Entry<String, byte[]> dsTables : cell.getValue().entrySet()) {
- Table table =
- Transport.getJsonFactory()
- .createJsonParser(new ByteArrayInputStream(dsTables.getValue()))
- .parse(Table.class);
- dataset.put(dsTables.getKey(), table);
- }
- }
- return copy;
}
}
@@ -658,6 +699,8 @@ public class BigQueryIOTest implements Serializable {
@Before
public void setUp() throws IOException {
MockitoAnnotations.initMocks(this);
+ tables = HashBasedTable.create();
+ BigQueryIO.clearCreatedTables();
}
@Test
@@ -716,8 +759,11 @@ public class BigQueryIOTest implements Serializable {
bqOptions.setProject(projectId);
bqOptions.setTempLocation("gs://testbucket/testdir");
- FakeDatasetService fakeDatasetService =
- new FakeDatasetService().withTable(projectId, datasetId, tableId, null);
+ FakeDatasetService fakeDatasetService = new FakeDatasetService();
+ fakeDatasetService.createDataset(projectId, datasetId, "", "");
+ TableReference tableReference =
+ new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId);
+ fakeDatasetService.createTable(new Table().setTableReference(tableReference));
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
.withJobService(new FakeJobService())
@@ -765,7 +811,7 @@ public class BigQueryIOTest implements Serializable {
p.apply("ReadMyTable",
BigQueryIO.Read
.from("foo.com:project:somedataset.sometable")
- .fromQuery("query"));
+ .fromQuery("query"));
p.run();
}
@@ -829,7 +875,7 @@ public class BigQueryIOTest implements Serializable {
@Test
@Category(NeedsRunner.class)
- public void testReadFromTable() throws IOException {
+ public void testReadFromTable() throws IOException, InterruptedException {
BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
bqOptions.setProject("defaultProject");
bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
@@ -850,10 +896,15 @@ public class BigQueryIOTest implements Serializable {
ImmutableList.of(
new TableFieldSchema().setName("name").setType("STRING"),
new TableFieldSchema().setName("number").setType("INTEGER"))));
+ sometable.setTableReference(
+ new TableReference()
+ .setProjectId("non-executing-project")
+ .setDatasetId("somedataset")
+ .setTableId("sometable"));
sometable.setNumBytes(1024L * 1024L);
- FakeDatasetService fakeDatasetService =
- new FakeDatasetService()
- .withTable("non-executing-project", "somedataset", "sometable", sometable);
+ FakeDatasetService fakeDatasetService = new FakeDatasetService();
+ fakeDatasetService.createDataset("non-executing-project", "somedataset", "", "");
+ fakeDatasetService.createTable(sometable);
SerializableFunction<Void, Schema> schemaGenerator =
new SerializableFunction<Void, Schema>() {
@Override
@@ -945,6 +996,216 @@ public class BigQueryIOTest implements Serializable {
@Test
@Category(NeedsRunner.class)
+ public void testStreamingWrite() throws Exception {
+ BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+ bqOptions.setProject("defaultProject");
+ bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+
+ FakeDatasetService datasetService = new FakeDatasetService();
+ datasetService.createDataset("project-id", "dataset-id", "", "");
+ FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+ .withDatasetService(datasetService);
+
+ Pipeline p = TestPipeline.create(bqOptions);
+ p.apply(Create.of(
+ new TableRow().set("name", "a").set("number", 1),
+ new TableRow().set("name", "b").set("number", 2),
+ new TableRow().set("name", "c").set("number", 3),
+ new TableRow().set("name", "d").set("number", 4))
+ .withCoder(TableRowJsonCoder.of()))
+ .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED)
+ .apply(BigQueryIO.Write.to("project-id:dataset-id.table-id")
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withSchema(new TableSchema().setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("name").setType("STRING"),
+ new TableFieldSchema().setName("number").setType("INTEGER"))))
+ .withTestServices(fakeBqServices)
+ .withoutValidation());
+ p.run();
+
+
+ assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id"),
+ containsInAnyOrder(
+ new TableRow().set("name", "a").set("number", 1),
+ new TableRow().set("name", "b").set("number", 2),
+ new TableRow().set("name", "c").set("number", 3),
+ new TableRow().set("name", "d").set("number", 4)));
+ }
+
+ /**
+ * A generic window function that allows partitioning data into windows by a string value.
+ *
+ * <p>Logically, creates multiple global windows, and the user provides a function that
+ * decides which global window a value should go into.
+ */
+ private static class PartitionedGlobalWindows extends
+
+ NonMergingWindowFn<TableRow, PartitionedGlobalWindow> {
+ private SerializableFunction<TableRow, String> extractPartition;
+
+ public PartitionedGlobalWindows(SerializableFunction<TableRow, String> extractPartition) {
+ this.extractPartition = extractPartition;
+ }
+
+ @Override
+ public Collection<PartitionedGlobalWindow> assignWindows(AssignContext c) {
+ return Collections.singletonList(new PartitionedGlobalWindow(
+ extractPartition.apply(c.element())));
+ }
+
+ @Override
+ public boolean isCompatible(WindowFn<?, ?> o) {
+ return o instanceof PartitionedGlobalWindows;
+ }
+
+ @Override
+ public Coder<PartitionedGlobalWindow> windowCoder() {
+ return new PartitionedGlobalWindowCoder();
+ }
+
+ @Override
+ public PartitionedGlobalWindow getSideInputWindow(BoundedWindow window) {
+ throw new UnsupportedOperationException(
+ "PartitionedGlobalWindows is not allowed in side inputs");
+ }
+
+ @Override
+ public Instant getOutputTime(Instant inputTimestamp, PartitionedGlobalWindow window) {
+ return inputTimestamp;
+ }
+ }
+
+ /**
+ * Custom Window object that encodes a String value.
+ */
+ private static class PartitionedGlobalWindow extends BoundedWindow {
+ String value;
+
+ public PartitionedGlobalWindow(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public Instant maxTimestamp() {
+ return GlobalWindow.INSTANCE.maxTimestamp();
+ }
+
+ // The following methods are only needed due to BEAM-1022. Once this issue is fixed, we will
+ // no longer need these.
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof PartitionedGlobalWindow) {
+ return value.equals(((PartitionedGlobalWindow) other).value);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return value.hashCode();
+ }
+ }
+
+ /**
+ * Coder for @link{PartitionedGlobalWindow}.
+ */
+ private static class PartitionedGlobalWindowCoder extends AtomicCoder<PartitionedGlobalWindow> {
+ @Override
+ public void encode(PartitionedGlobalWindow window, OutputStream outStream, Context context)
+ throws IOException, CoderException {
+ StringUtf8Coder.of().encode(window.value, outStream, context);
+ }
+
+ @Override
+ public PartitionedGlobalWindow decode(InputStream inStream, Context context)
+ throws IOException, CoderException {
+ return new PartitionedGlobalWindow(StringUtf8Coder.of().decode(inStream, context));
+ }
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testStreamingWriteWithWindowFn() throws Exception {
+ BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+ bqOptions.setProject("defaultProject");
+ bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+
+ FakeDatasetService datasetService = new FakeDatasetService();
+ datasetService.createDataset("project-id", "dataset-id", "", "");
+ FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+ .withDatasetService(datasetService);
+
+ List<TableRow> inserts = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ inserts.add(new TableRow().set("name", "number" + i).set("number", i));
+ }
+
+ // Create a windowing strategy that puts the input into five different windows depending on
+ // record value.
+ WindowFn<TableRow, PartitionedGlobalWindow> window = new PartitionedGlobalWindows(
+ new SerializableFunction<TableRow, String>() {
+ @Override
+ public String apply(TableRow value) {
+ try {
+ int intValue = (Integer) value.get("number") % 5;
+ return Integer.toString(intValue);
+ } catch (NumberFormatException e) {
+ fail(e.toString());
+ }
+ return value.toString();
+ }
+ }
+ );
+
+ SerializableFunction<BoundedWindow, String> tableFunction =
+ new SerializableFunction<BoundedWindow, String>() {
+ @Override
+ public String apply(BoundedWindow input) {
+ return "project-id:dataset-id.table-id-" + ((PartitionedGlobalWindow) input).value;
+ }
+ };
+
+ Pipeline p = TestPipeline.create(bqOptions);
+ p.apply(Create.of(inserts).withCoder(TableRowJsonCoder.of()))
+ .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED)
+ .apply(Window.<TableRow>into(window))
+ .apply(BigQueryIO.Write
+ .to(tableFunction)
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withSchema(new TableSchema().setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("name").setType("STRING"),
+ new TableFieldSchema().setName("number").setType("INTEGER"))))
+ .withTestServices(fakeBqServices)
+ .withoutValidation());
+ p.run();
+
+
+ assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-0"),
+ containsInAnyOrder(
+ new TableRow().set("name", "number0").set("number", 0),
+ new TableRow().set("name", "number5").set("number", 5)));
+ assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-1"),
+ containsInAnyOrder(
+ new TableRow().set("name", "number1").set("number", 1),
+ new TableRow().set("name", "number6").set("number", 6)));
+ assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-2"),
+ containsInAnyOrder(
+ new TableRow().set("name", "number2").set("number", 2),
+ new TableRow().set("name", "number7").set("number", 7)));
+ assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-3"),
+ containsInAnyOrder(
+ new TableRow().set("name", "number3").set("number", 3),
+ new TableRow().set("name", "number8").set("number", 8)));
+ assertThat(datasetService.getAllRows("project-id", "dataset-id", "table-id-4"),
+ containsInAnyOrder(
+ new TableRow().set("name", "number4").set("number", 4),
+ new TableRow().set("name", "number9").set("number", 9)));
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
public void testWriteUnknown() throws Exception {
BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
bqOptions.setProject("defaultProject");
@@ -1031,7 +1292,8 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildWrite() {
- BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("foo.com:project:somedataset.sometable");
+ BigQueryIO.Write.Bound bound =
+ BigQueryIO.Write.to("foo.com:project:somedataset.sometable");
checkWriteObject(
bound, "foo.com:project", "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
@@ -1980,32 +2242,42 @@ public class BigQueryIOTest implements Serializable {
}
@Test
- public void testRuntimeOptionsNotCalledInApplyInputTable() {
+ public void testRuntimeOptionsNotCalledInApplyInputTable() throws IOException {
RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
- bqOptions.setTempLocation("gs://testbucket/testdir");
+ bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+ FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+ .withJobService(new FakeJobService());
Pipeline pipeline = TestPipeline.create(options);
pipeline
- .apply(BigQueryIO.Read.from(options.getInputTable()).withoutValidation())
- .apply(BigQueryIO.Write
+ .apply(BigQueryIO.Read
+ .from(options.getInputTable()).withoutValidation()
+ .withTestServices(fakeBqServices))
+ .apply(BigQueryIO.Write
.to(options.getOutputTable())
.withSchema(NestedValueProvider.of(
options.getOutputSchema(), new JsonSchemaToTableSchema()))
+ .withTestServices(fakeBqServices)
.withoutValidation());
}
@Test
- public void testRuntimeOptionsNotCalledInApplyInputQuery() {
+ public void testRuntimeOptionsNotCalledInApplyInputQuery() throws IOException {
RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
- bqOptions.setTempLocation("gs://testbucket/testdir");
+ bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+ FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+ .withJobService(new FakeJobService());
Pipeline pipeline = TestPipeline.create(options);
pipeline
- .apply(BigQueryIO.Read.fromQuery(options.getInputQuery()).withoutValidation())
- .apply(BigQueryIO.Write
+ .apply(BigQueryIO.Read
+ .fromQuery(options.getInputQuery()).withoutValidation()
+ .withTestServices(fakeBqServices))
+ .apply(BigQueryIO.Write
.to(options.getOutputTable())
.withSchema(NestedValueProvider.of(
options.getOutputSchema(), new JsonSchemaToTableSchema()))
+ .withTestServices(fakeBqServices)
.withoutValidation());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c53a332e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 0e76660..10ed8bd 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -17,9 +17,11 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
+import static com.google.common.base.Verify.verifyNotNull;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.times;
@@ -47,9 +49,12 @@ import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse.InsertErrors;
+import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
+import com.google.cloud.hadoop.util.RetryBoundedBackOff;
import com.google.common.collect.ImmutableList;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -64,6 +69,7 @@ import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.util.Transport;
+import org.joda.time.Duration;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -325,7 +331,8 @@ public class BigQueryServicesImplTest {
bigquery.tables().get("projectId", "datasetId", "tableId"),
"Failed to get table.",
Sleeper.DEFAULT,
- BackOff.STOP_BACKOFF);
+ BackOff.STOP_BACKOFF,
+ BigQueryServicesImpl.ALWAYS_RETRY);
assertEquals(testTable, table);
verify(response, times(1)).getStatusCode();
@@ -358,6 +365,11 @@ public class BigQueryServicesImplTest {
verify(response, times(2)).getContentType();
expectedLogs.verifyInfo("BigQuery insertAll exceeded rate limit, retrying");
}
+ // A BackOff that makes a total of 4 attempts
+ private static final FluentBackoff TEST_BACKOFF = FluentBackoff.DEFAULT
+ .withInitialBackoff(Duration.millis(1))
+ .withExponent(1)
+ .withMaxRetries(3);
/**
* Tests that {@link DatasetServiceImpl#insertAll} retries selected rows on failure.
@@ -371,7 +383,8 @@ public class BigQueryServicesImplTest {
List<String> insertIds = ImmutableList.of("a", "b");
final TableDataInsertAllResponse bFailed = new TableDataInsertAllResponse()
- .setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(1L)));
+ .setInsertErrors(ImmutableList.of(
+ new InsertErrors().setIndex(1L).setErrors(ImmutableList.of(new ErrorProto()))));
final TableDataInsertAllResponse allRowsSucceeded = new TableDataInsertAllResponse();
@@ -389,9 +402,6 @@ public class BigQueryServicesImplTest {
expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery");
}
- // A BackOff that makes a total of 4 attempts
- private static final FluentBackoff TEST_BACKOFF = FluentBackoff.DEFAULT.withMaxRetries(3);
-
/**
* Tests that {@link DatasetServiceImpl#insertAll} fails gracefully when persistent issues.
*/
@@ -490,9 +500,128 @@ public class BigQueryServicesImplTest {
GoogleJsonError error = new GoogleJsonError();
error.setErrors(ImmutableList.of(info));
error.setCode(status);
+ error.setMessage(reason);
// The actual JSON response is an error container.
GoogleJsonErrorContainer container = new GoogleJsonErrorContainer();
container.setError(error);
return container;
}
+
+ @Test
+ public void testCreateTableSucceeds() throws IOException {
+ TableReference ref =
+ new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+ Table testTable = new Table().setTableReference(ref);
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getStatusCode()).thenReturn(200);
+ when(response.getContent()).thenReturn(toStream(testTable));
+
+ BigQueryServicesImpl.DatasetServiceImpl services =
+ new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+ Table ret =
+ services.tryCreateTable(
+ testTable,
+ new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF),
+ Sleeper.DEFAULT);
+ assertEquals(testTable, ret);
+ verify(response, times(1)).getStatusCode();
+ verify(response, times(1)).getContent();
+ verify(response, times(1)).getContentType();
+ }
+
+ /**
+ * Tests that {@link BigQueryServicesImpl} does not retry non-rate-limited attempts.
+ */
+ @Test
+ public void testCreateTableDoesNotRetry() throws IOException {
+ TableReference ref =
+ new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+ Table testTable = new Table().setTableReference(ref);
+ // First response is 403 not-rate-limited, second response has valid payload but should not
+ // be invoked.
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getStatusCode()).thenReturn(403).thenReturn(200);
+ when(response.getContent())
+ .thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 403)))
+ .thenReturn(toStream(testTable));
+
+ thrown.expect(GoogleJsonResponseException.class);
+ thrown.expectMessage("actually forbidden");
+
+ BigQueryServicesImpl.DatasetServiceImpl services =
+ new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+ try {
+ services.tryCreateTable(
+ testTable,
+ new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF),
+ Sleeper.DEFAULT);
+ fail();
+ } catch (IOException e) {
+ verify(response, times(1)).getStatusCode();
+ verify(response, times(1)).getContent();
+ verify(response, times(1)).getContentType();
+ throw e;
+ }
+ }
+
+ /**
+ * Tests that table creation succeeds when the table already exists.
+ */
+ @Test
+ public void testCreateTableSucceedsAlreadyExists() throws IOException {
+ TableReference ref =
+ new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+ TableSchema schema = new TableSchema().setFields(ImmutableList.of(
+ new TableFieldSchema().setName("column1").setType("String"),
+ new TableFieldSchema().setName("column2").setType("Integer")));
+ Table testTable = new Table().setTableReference(ref).setSchema(schema);
+
+ when(response.getStatusCode()).thenReturn(409); // 409 means already exists
+
+ BigQueryServicesImpl.DatasetServiceImpl services =
+ new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+ Table ret =
+ services.tryCreateTable(
+ testTable,
+ new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF),
+ Sleeper.DEFAULT);
+
+ assertNull(ret);
+ verify(response, times(1)).getStatusCode();
+ verify(response, times(1)).getContent();
+ verify(response, times(1)).getContentType();
+ }
+
+ /**
+ * Tests that {@link BigQueryServicesImpl} retries quota rate limited attempts.
+ */
+ @Test
+ public void testCreateTableRetry() throws IOException {
+ TableReference ref =
+ new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+ Table testTable = new Table().setTableReference(ref);
+
+ // First response is 403 rate limited, second response has valid payload.
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getStatusCode()).thenReturn(403).thenReturn(200);
+ when(response.getContent())
+ .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
+ .thenReturn(toStream(testTable));
+
+ BigQueryServicesImpl.DatasetServiceImpl services =
+ new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+ Table ret =
+ services.tryCreateTable(
+ testTable,
+ new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF),
+ Sleeper.DEFAULT);
+ assertEquals(testTable, ret);
+ verify(response, times(2)).getStatusCode();
+ verify(response, times(2)).getContent();
+ verify(response, times(2)).getContentType();
+ verifyNotNull(ret.getTableReference());
+ expectedLogs.verifyInfo(
+ "Quota limit reached when creating table project:dataset.table, "
+ + "retrying up to 5.0 minutes");
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c53a332e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java
deleted file mode 100644
index fb79c74..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import static com.google.common.base.Verify.verifyNotNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import com.google.api.client.googleapis.json.GoogleJsonError;
-import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
-import com.google.api.client.googleapis.json.GoogleJsonErrorContainer;
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.http.LowLevelHttpResponse;
-import com.google.api.client.json.GenericJson;
-import com.google.api.client.json.Json;
-import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.client.testing.http.MockHttpTransport;
-import com.google.api.client.testing.http.MockLowLevelHttpRequest;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.cloud.hadoop.util.RetryBoundedBackOff;
-import com.google.common.collect.ImmutableList;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
-import org.apache.beam.sdk.util.Transport;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests of {@link BigQueryTableInserter}.
- */
-@RunWith(JUnit4.class)
-public class BigQueryTableInserterTest {
- @Rule public ExpectedException thrown = ExpectedException.none();
- @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(BigQueryTableInserter.class);
- @Mock private LowLevelHttpResponse response;
- private Bigquery bigquery;
- private PipelineOptions options;
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
-
- // A mock transport that lets us mock the API responses.
- MockHttpTransport transport =
- new MockHttpTransport.Builder()
- .setLowLevelHttpRequest(
- new MockLowLevelHttpRequest() {
- @Override
- public LowLevelHttpResponse execute() throws IOException {
- return response;
- }
- })
- .build();
-
- // A sample BigQuery API client that uses default JsonFactory and RetryHttpInitializer.
- bigquery =
- new Bigquery.Builder(
- transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())
- .build();
-
- options = PipelineOptionsFactory.create();
- }
-
- @After
- public void tearDown() throws IOException {
- // These three interactions happen for every request in the normal response parsing.
- verify(response, atLeastOnce()).getContentEncoding();
- verify(response, atLeastOnce()).getHeaderCount();
- verify(response, atLeastOnce()).getReasonPhrase();
- verifyNoMoreInteractions(response);
- }
-
- /** A helper to wrap a {@link GenericJson} object in a content stream. */
- private static InputStream toStream(GenericJson content) throws IOException {
- return new ByteArrayInputStream(JacksonFactory.getDefaultInstance().toByteArray(content));
- }
-
- /** A helper that generates the error JSON payload that Google APIs produce. */
- private static GoogleJsonErrorContainer errorWithReasonAndStatus(String reason, int status) {
- ErrorInfo info = new ErrorInfo();
- info.setReason(reason);
- info.setDomain("global");
- // GoogleJsonError contains one or more ErrorInfo objects; our utiities read the first one.
- GoogleJsonError error = new GoogleJsonError();
- error.setErrors(ImmutableList.of(info));
- error.setCode(status);
- // The actual JSON response is an error container.
- GoogleJsonErrorContainer container = new GoogleJsonErrorContainer();
- container.setError(error);
- return container;
- }
-
- /**
- * Tests that {@link BigQueryTableInserter} succeeds on the first try.
- */
- @Test
- public void testCreateTableSucceeds() throws IOException {
- Table testTable = new Table().setDescription("a table");
-
- when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
- when(response.getStatusCode()).thenReturn(200);
- when(response.getContent()).thenReturn(toStream(testTable));
-
- BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
- Table ret =
- inserter.tryCreateTable(
- new Table(),
- "project",
- "dataset",
- new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF),
- Sleeper.DEFAULT);
- assertEquals(testTable, ret);
- verify(response, times(1)).getStatusCode();
- verify(response, times(1)).getContent();
- verify(response, times(1)).getContentType();
- }
-
- /**
- * Tests that {@link BigQueryTableInserter} succeeds when the table already exists.
- */
- @Test
- public void testCreateTableSucceedsAlreadyExists() throws IOException {
- when(response.getStatusCode()).thenReturn(409); // 409 means already exists
-
- BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
- Table ret =
- inserter.tryCreateTable(
- new Table(),
- "project",
- "dataset",
- new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF),
- Sleeper.DEFAULT);
-
- assertNull(ret);
- verify(response, times(1)).getStatusCode();
- verify(response, times(1)).getContent();
- verify(response, times(1)).getContentType();
- }
-
- /**
- * Tests that {@link BigQueryTableInserter} retries quota rate limited attempts.
- */
- @Test
- public void testCreateTableRetry() throws IOException {
- TableReference ref =
- new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
- Table testTable = new Table().setTableReference(ref);
-
- // First response is 403 rate limited, second response has valid payload.
- when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
- when(response.getStatusCode()).thenReturn(403).thenReturn(200);
- when(response.getContent())
- .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
- .thenReturn(toStream(testTable));
-
- BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
- Table ret =
- inserter.tryCreateTable(
- testTable,
- "project",
- "dataset",
- new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF),
- Sleeper.DEFAULT);
- assertEquals(testTable, ret);
- verify(response, times(2)).getStatusCode();
- verify(response, times(2)).getContent();
- verify(response, times(2)).getContentType();
- verifyNotNull(ret.getTableReference());
- expectedLogs.verifyInfo(
- "Quota limit reached when creating table project:dataset.table, "
- + "retrying up to 5.0 minutes");
- }
-
- /**
- * Tests that {@link BigQueryTableInserter} does not retry non-rate-limited attempts.
- */
- @Test
- public void testCreateTableDoesNotRetry() throws IOException {
- Table testTable = new Table().setDescription("a table");
-
- // First response is 403 not-rate-limited, second response has valid payload but should not
- // be invoked.
- when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
- when(response.getStatusCode()).thenReturn(403).thenReturn(200);
- when(response.getContent())
- .thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 403)))
- .thenReturn(toStream(testTable));
-
- thrown.expect(GoogleJsonResponseException.class);
- thrown.expectMessage("actually forbidden");
-
- BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
- try {
- inserter.tryCreateTable(
- new Table(),
- "project",
- "dataset",
- new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF),
- Sleeper.DEFAULT);
- fail();
- } catch (IOException e) {
- verify(response, times(1)).getStatusCode();
- verify(response, times(1)).getContent();
- verify(response, times(1)).getContentType();
- throw e;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c53a332e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index e539b33..8130238 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -361,60 +361,18 @@ public class BigQueryUtilTest {
}
@Test
- public void testWriteAppend() throws IOException {
- onTableGet(basicTableSchema());
-
- TableReference ref = BigQueryIO
- .parseTableSpec("project:dataset.table");
-
- BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options);
-
- inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_APPEND,
- BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null);
-
- verifyTableGet();
- }
-
- @Test
- public void testWriteEmpty() throws IOException {
+ public void testTableGet() throws InterruptedException, IOException {
onTableGet(basicTableSchema());
TableDataList dataList = new TableDataList().setTotalRows(0L);
onTableList(dataList);
- TableReference ref = BigQueryIO
- .parseTableSpec("project:dataset.table");
-
- BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options);
+ BigQueryServicesImpl.DatasetServiceImpl services =
+ new BigQueryServicesImpl.DatasetServiceImpl(mockClient, options);
- inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_EMPTY,
- BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null);
+ services.getTable("project", "dataset", "table");
verifyTableGet();
- verifyTabledataList();
- }
-
- @Test
- public void testWriteEmptyFail() throws IOException {
- thrown.expect(IOException.class);
-
- onTableGet(basicTableSchema());
-
- TableDataList dataList = rawDataList(rawRow("Arthur", 42));
- onTableList(dataList);
-
- TableReference ref = BigQueryIO
- .parseTableSpec("project:dataset.table");
-
- BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options);
-
- try {
- inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_EMPTY,
- BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null);
- } finally {
- verifyTableGet();
- verifyTabledataList();
- }
}
@Test
[3/3] beam git commit: This closes #1710
Posted by dh...@apache.org.
This closes #1710
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e77080a4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e77080a4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e77080a4
Branch: refs/heads/release-0.4.0
Commit: e77080a4d4358d0149d3c1e554581a254ea26506
Parents: 4820160 b1459de
Author: Dan Halperin <dh...@google.com>
Authored: Wed Dec 28 21:10:23 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Dec 28 21:10:23 2016 -0800
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 71 ++-
.../sdk/io/gcp/bigquery/BigQueryServices.java | 7 +-
.../io/gcp/bigquery/BigQueryServicesImpl.java | 121 ++++-
.../io/gcp/bigquery/BigQueryTableInserter.java | 217 ---------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 484 +++++++++++++++----
.../gcp/bigquery/BigQueryServicesImplTest.java | 139 +++++-
.../gcp/bigquery/BigQueryTableInserterTest.java | 245 ----------
.../sdk/io/gcp/bigquery/BigQueryUtilTest.java | 50 +-
8 files changed, 696 insertions(+), 638 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: BigQueryIO: fix streaming write, typo in API
Posted by dh...@apache.org.
BigQueryIO: fix streaming write, typo in API
and improve testing
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b1459dea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b1459dea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b1459dea
Branch: refs/heads/release-0.4.0
Commit: b1459deacfc7077b49803b9460c5127cd4fb74bf
Parents: c53a332
Author: Sam McVeety <sg...@google.com>
Authored: Fri Dec 16 18:10:28 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Dec 28 14:22:34 2016 -0800
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 23 +++++--
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 72 ++++++++++++--------
2 files changed, 63 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b1459dea/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 28049ed..7bb1e51 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -370,7 +370,8 @@ public class BigQueryIO {
}
}
- private static class TableSpecToTableRef
+ @VisibleForTesting
+ static class TableSpecToTableRef
implements SerializableFunction<String, TableReference> {
@Override
public TableReference apply(String from) {
@@ -807,6 +808,7 @@ public class BigQueryIO {
/**
* Returns the query to be read, or {@code null} if reading from a table instead.
*/
+ @Nullable
public String getQuery() {
return query == null ? null : query.get();
}
@@ -814,7 +816,8 @@ public class BigQueryIO {
/**
* Returns the query to be read, or {@code null} if reading from a table instead.
*/
- public ValueProvider<String> getQueryProivder() {
+ @Nullable
+ public ValueProvider<String> getQueryProvider() {
return query;
}
@@ -2813,7 +2816,8 @@ public class BigQueryIO {
* a randomUUID is generated only once per bucket of data. The actual unique
* id is created by concatenating this randomUUID with a sequential number.
*/
- private static class TagWithUniqueIdsAndTable
+ @VisibleForTesting
+ static class TagWithUniqueIdsAndTable
extends DoFn<TableRow, KV<ShardedKey<String>, TableRowInfo>> {
/** TableSpec to write to. */
private final ValueProvider<String> tableSpec;
@@ -2830,8 +2834,12 @@ public class BigQueryIO {
checkArgument(table == null ^ tableRefFunction == null,
"Exactly one of table or tableRefFunction should be set");
if (table != null) {
- if (table.isAccessible() && table.get().getProjectId() == null) {
- table.get().setProjectId(options.as(BigQueryOptions.class).getProject());
+ if (table.isAccessible() && Strings.isNullOrEmpty(table.get().getProjectId())) {
+ TableReference tableRef = table.get()
+ .setProjectId(options.as(BigQueryOptions.class).getProject());
+ table = NestedValueProvider.of(
+ StaticValueProvider.of(toJsonString(tableRef)),
+ new JsonTableRefToTableRef());
}
this.tableSpec = NestedValueProvider.of(table, new TableRefToTableSpec());
} else {
@@ -2870,6 +2878,11 @@ public class BigQueryIO {
}
}
+ @VisibleForTesting
+ ValueProvider<String> getTableSpec() {
+ return tableSpec;
+ }
+
private String tableSpecFromWindow(BigQueryOptions options, BoundedWindow window) {
if (tableSpec != null) {
return tableSpec.get();
http://git-wip-us.apache.org/repos/asf/beam/blob/b1459dea/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index b78316f..dc566d2 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -26,6 +26,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -2242,43 +2243,60 @@ public class BigQueryIOTest implements Serializable {
}
@Test
- public void testRuntimeOptionsNotCalledInApplyInputTable() throws IOException {
+ public void testRuntimeOptionsNotCalledInApplyInputTable() {
RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
- bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
- FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
- .withJobService(new FakeJobService());
+ bqOptions.setTempLocation("gs://testbucket/testdir");
Pipeline pipeline = TestPipeline.create(options);
- pipeline
- .apply(BigQueryIO.Read
- .from(options.getInputTable()).withoutValidation()
- .withTestServices(fakeBqServices))
- .apply(BigQueryIO.Write
- .to(options.getOutputTable())
- .withSchema(NestedValueProvider.of(
- options.getOutputSchema(), new JsonSchemaToTableSchema()))
- .withTestServices(fakeBqServices)
- .withoutValidation());
+ BigQueryIO.Read.Bound read = BigQueryIO.Read.from(
+ options.getInputTable()).withoutValidation();
+ pipeline.apply(read);
+ // Test that this doesn't throw.
+ DisplayData.from(read);
}
@Test
- public void testRuntimeOptionsNotCalledInApplyInputQuery() throws IOException {
+ public void testRuntimeOptionsNotCalledInApplyInputQuery() {
RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
- bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
- FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
- .withJobService(new FakeJobService());
+ bqOptions.setTempLocation("gs://testbucket/testdir");
Pipeline pipeline = TestPipeline.create(options);
+ BigQueryIO.Read.Bound read = BigQueryIO.Read.fromQuery(
+ options.getInputQuery()).withoutValidation();
+ pipeline.apply(read);
+ // Test that this doesn't throw.
+ DisplayData.from(read);
+ }
+
+ @Test
+ public void testRuntimeOptionsNotCalledInApplyOutput() {
+ RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ bqOptions.setTempLocation("gs://testbucket/testdir");
+ Pipeline pipeline = TestPipeline.create(options);
+ BigQueryIO.Write.Bound write = BigQueryIO.Write
+ .to(options.getOutputTable())
+ .withSchema(NestedValueProvider.of(
+ options.getOutputSchema(), new JsonSchemaToTableSchema()))
+ .withoutValidation();
pipeline
- .apply(BigQueryIO.Read
- .fromQuery(options.getInputQuery()).withoutValidation()
- .withTestServices(fakeBqServices))
- .apply(BigQueryIO.Write
- .to(options.getOutputTable())
- .withSchema(NestedValueProvider.of(
- options.getOutputSchema(), new JsonSchemaToTableSchema()))
- .withTestServices(fakeBqServices)
- .withoutValidation());
+ .apply(Create.<TableRow>of())
+ .apply(write);
+ // Test that this doesn't throw.
+ DisplayData.from(write);
+ }
+
+ @Test
+ public void testTagWithUniqueIdsAndTableProjectNotNullWithNvp() {
+ BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class);
+ bqOptions.setProject("project");
+ BigQueryIO.TagWithUniqueIdsAndTable tag =
+ new BigQueryIO.TagWithUniqueIdsAndTable(
+ bqOptions, NestedValueProvider.of(
+ StaticValueProvider.of("data_set.table_name"),
+ new BigQueryIO.TableSpecToTableRef()), null);
+ TableReference table = BigQueryIO.parseTableSpec(tag.getTableSpec().get());
+ assertNotNull(table.getProjectId());
}
private static void testNumFiles(File tempDir, int expectedNumFiles) {