You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/29 16:22:00 UTC
[14/50] beam git commit: Refactor BigQueryServices to have
TableReference in methods signatures
Refactor BigQueryServices to have TableReference in methods signatures
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f9d1d682
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f9d1d682
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f9d1d682
Branch: refs/heads/python-sdk
Commit: f9d1d682340fa3083bc18723605bf3d0aa6d76cd
Parents: e77de7c
Author: Pei He <pe...@google.com>
Authored: Tue Jan 24 16:45:16 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Jan 24 18:00:40 2017 -0800
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 40 +++++--------------
.../sdk/io/gcp/bigquery/BigQueryServices.java | 9 ++---
.../io/gcp/bigquery/BigQueryServicesImpl.java | 23 ++++-------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 41 ++++++++------------
.../sdk/io/gcp/bigquery/BigQueryUtilTest.java | 3 +-
5 files changed, 40 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f9d1d682/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index fa49f55..b6f9fb0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -997,8 +997,7 @@ public class BigQueryIO {
TableReference table = JSON_FACTORY.fromString(jsonTable.get(), TableReference.class);
Long numBytes = bqServices.getDatasetService(options.as(BigQueryOptions.class))
- .getTable(table.getProjectId(), table.getDatasetId(), table.getTableId())
- .getNumBytes();
+ .getTable(table).getNumBytes();
tableSizeBytes.compareAndSet(null, numBytes);
}
return tableSizeBytes.get();
@@ -1088,10 +1087,7 @@ public class BigQueryIO {
DatasetService tableService = bqServices.getDatasetService(bqOptions);
if (referencedTables != null && !referencedTables.isEmpty()) {
TableReference queryTable = referencedTables.get(0);
- location = tableService.getTable(
- queryTable.getProjectId(),
- queryTable.getDatasetId(),
- queryTable.getTableId()).getLocation();
+ location = tableService.getTable(queryTable).getLocation();
}
// 2. Create the temporary dataset in the query location.
@@ -1120,10 +1116,7 @@ public class BigQueryIO {
JSON_FACTORY.fromString(jsonQueryTempTable.get(), TableReference.class);
DatasetService tableService = bqServices.getDatasetService(bqOptions);
- tableService.deleteTable(
- tableToRemove.getProjectId(),
- tableToRemove.getDatasetId(),
- tableToRemove.getTableId());
+ tableService.deleteTable(tableToRemove);
tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId());
}
@@ -1227,10 +1220,8 @@ public class BigQueryIO {
String extractJobId = getExtractJobId(jobIdToken);
List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService);
- TableSchema tableSchema = bqServices.getDatasetService(bqOptions).getTable(
- tableToExtract.getProjectId(),
- tableToExtract.getDatasetId(),
- tableToExtract.getTableId()).getSchema();
+ TableSchema tableSchema = bqServices.getDatasetService(bqOptions)
+ .getTable(tableToExtract).getSchema();
cleanupTempResource(bqOptions);
return createSources(tempFiles, tableSchema);
@@ -1867,13 +1858,9 @@ public class BigQueryIO {
DatasetService datasetService,
TableReference tableRef) {
try {
- if (datasetService.getTable(
- tableRef.getProjectId(),
- tableRef.getDatasetId(),
- tableRef.getTableId()) != null) {
+ if (datasetService.getTable(tableRef) != null) {
checkState(
- datasetService.isTableEmpty(
- tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()),
+ datasetService.isTableEmpty(tableRef),
"BigQuery table is not empty: %s.",
BigQueryIO.toTableSpec(tableRef));
}
@@ -2535,10 +2522,7 @@ public class BigQueryIO {
for (TableReference tableRef : tempTables) {
try {
LOG.debug("Deleting table {}", toJsonString(tableRef));
- tableService.deleteTable(
- tableRef.getProjectId(),
- tableRef.getDatasetId(),
- tableRef.getTableId());
+ tableService.deleteTable(tableRef);
} catch (Exception e) {
LOG.warn("Failed to delete the table {}", toJsonString(tableRef), e);
}
@@ -2587,7 +2571,7 @@ public class BigQueryIO {
private static void verifyTablePresence(DatasetService datasetService, TableReference table) {
try {
- datasetService.getTable(table.getProjectId(), table.getDatasetId(), table.getTableId());
+ datasetService.getTable(table);
} catch (Exception e) {
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {
@@ -2712,11 +2696,7 @@ public class BigQueryIO {
// every thread from attempting a create and overwhelming our BigQuery quota.
DatasetService datasetService = bqServices.getDatasetService(options);
if (!createdTables.contains(tableSpec)) {
- Table table = datasetService.getTable(
- tableReference.getProjectId(),
- tableReference.getDatasetId(),
- tableReference.getTableId());
- if (table == null) {
+ if (datasetService.getTable(tableReference) == null) {
TableSchema tableSchema = JSON_FACTORY.fromString(
jsonTableSchema.get(), TableSchema.class);
datasetService.createTable(
http://git-wip-us.apache.org/repos/asf/beam/blob/f9d1d682/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 32cf46d..03e4391 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -119,8 +119,7 @@ interface BigQueryServices extends Serializable {
* <p>Returns null if the table is not found.
*/
@Nullable
- Table getTable(String projectId, String datasetId, String tableId)
- throws InterruptedException, IOException;
+ Table getTable(TableReference tableRef) throws InterruptedException, IOException;
/**
* Creates the specified table if it does not exist.
@@ -131,16 +130,14 @@ interface BigQueryServices extends Serializable {
* Deletes the table specified by tableId from the dataset.
* If the table contains data, all the data will be deleted.
*/
- void deleteTable(String projectId, String datasetId, String tableId)
- throws IOException, InterruptedException;
+ void deleteTable(TableReference tableRef) throws IOException, InterruptedException;
/**
* Returns true if the table is empty.
*
* @throws IOException if the table is not found.
*/
- boolean isTableEmpty(String projectId, String datasetId, String tableId)
- throws IOException, InterruptedException;
+ boolean isTableEmpty(TableReference tableRef) throws IOException, InterruptedException;
/**
* Gets the specified {@link Dataset} resource by dataset ID.
http://git-wip-us.apache.org/repos/asf/beam/blob/f9d1d682/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index c524ce4..75796ab 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -394,15 +394,12 @@ class BigQueryServicesImpl implements BigQueryServices {
*/
@Override
@Nullable
- public Table getTable(String projectId, String datasetId, String tableId)
+ public Table getTable(TableReference tableRef)
throws IOException, InterruptedException {
BackOff backoff =
FluentBackoff.DEFAULT
.withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
- return getTable(
- new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId),
- backoff,
- Sleeper.DEFAULT);
+ return getTable(tableRef, backoff, Sleeper.DEFAULT);
}
@VisibleForTesting
@@ -506,31 +503,27 @@ class BigQueryServicesImpl implements BigQueryServices {
* @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
*/
@Override
- public void deleteTable(String projectId, String datasetId, String tableId)
- throws IOException, InterruptedException {
+ public void deleteTable(TableReference tableRef) throws IOException, InterruptedException {
BackOff backoff =
FluentBackoff.DEFAULT
.withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
executeWithRetries(
- client.tables().delete(projectId, datasetId, tableId),
+ client.tables().delete(
+ tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()),
String.format(
"Unable to delete table: %s, aborting after %d retries.",
- tableId, MAX_RPC_RETRIES),
+ tableRef.getTableId(), MAX_RPC_RETRIES),
Sleeper.DEFAULT,
backoff,
ALWAYS_RETRY);
}
@Override
- public boolean isTableEmpty(String projectId, String datasetId, String tableId)
- throws IOException, InterruptedException {
+ public boolean isTableEmpty(TableReference tableRef) throws IOException, InterruptedException {
BackOff backoff =
FluentBackoff.DEFAULT
.withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
- return isTableEmpty(
- new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId),
- backoff,
- Sleeper.DEFAULT);
+ return isTableEmpty(tableRef, backoff, Sleeper.DEFAULT);
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/beam/blob/f9d1d682/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index ba7f44e..0b8d60d 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -32,6 +32,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
@@ -526,18 +527,18 @@ public class BigQueryIOTest implements Serializable {
private static class FakeDatasetService implements DatasetService, Serializable {
@Override
- public Table getTable(String projectId, String datasetId, String tableId)
+ public Table getTable(TableReference tableRef)
throws InterruptedException, IOException {
synchronized (tables) {
Map<String, TableContainer> dataset =
checkNotNull(
- tables.get(projectId, datasetId),
+ tables.get(tableRef.getProjectId(), tableRef.getDatasetId()),
"Tried to get a dataset %s:%s from %s, but no such dataset was set",
- projectId,
- datasetId,
- tableId,
+ tableRef.getProjectId(),
+ tableRef.getDatasetId(),
+ tableRef.getTableId(),
FakeDatasetService.class.getSimpleName());
- TableContainer tableContainer = dataset.get(tableId);
+ TableContainer tableContainer = dataset.get(tableRef.getTableId());
return tableContainer == null ? null : tableContainer.getTable();
}
}
@@ -569,8 +570,7 @@ public class BigQueryIOTest implements Serializable {
}
@Override
- public void deleteTable(String projectId, String datasetId, String tableId)
- throws IOException, InterruptedException {
+ public void deleteTable(TableReference tableRef) throws IOException, InterruptedException {
throw new UnsupportedOperationException("Unsupported");
}
@@ -595,9 +595,9 @@ public class BigQueryIOTest implements Serializable {
}
@Override
- public boolean isTableEmpty(String projectId, String datasetId, String tableId)
+ public boolean isTableEmpty(TableReference tableRef)
throws IOException, InterruptedException {
- Long numBytes = getTable(projectId, datasetId, tableId).getNumBytes();
+ Long numBytes = getTable(tableRef).getNumBytes();
return numBytes == null || numBytes == 0L;
}
@@ -1738,7 +1738,7 @@ public class BigQueryIOTest implements Serializable {
IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */);
when(mockIOChannelFactory.resolve(anyString(), anyString()))
.thenReturn("mock://tempLocation/output");
- when(mockDatasetService.getTable(anyString(), anyString(), anyString()))
+ when(mockDatasetService.getTable(any(TableReference.class)))
.thenReturn(new Table().setSchema(new TableSchema()));
Assert.assertThat(
@@ -1810,13 +1810,9 @@ public class BigQueryIOTest implements Serializable {
new JobStatistics2()
.setTotalBytesProcessed(100L)
.setReferencedTables(ImmutableList.of(queryTable))));
- when(mockDatasetService.getTable(
- eq(queryTable.getProjectId()), eq(queryTable.getDatasetId()), eq(queryTable.getTableId())))
+ when(mockDatasetService.getTable(eq(queryTable)))
.thenReturn(new Table().setSchema(new TableSchema()));
- when(mockDatasetService.getTable(
- eq(destinationTable.getProjectId()),
- eq(destinationTable.getDatasetId()),
- eq(destinationTable.getTableId())))
+ when(mockDatasetService.getTable(eq(destinationTable)))
.thenReturn(new Table().setSchema(new TableSchema()));
IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */);
when(mockIOChannelFactory.resolve(anyString(), anyString()))
@@ -1898,10 +1894,7 @@ public class BigQueryIOTest implements Serializable {
.thenReturn(new JobStatistics().setQuery(
new JobStatistics2()
.setTotalBytesProcessed(100L)));
- when(mockDatasetService.getTable(
- eq(destinationTable.getProjectId()),
- eq(destinationTable.getDatasetId()),
- eq(destinationTable.getTableId())))
+ when(mockDatasetService.getTable(eq(destinationTable)))
.thenReturn(new Table().setSchema(new TableSchema()));
IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */);
when(mockIOChannelFactory.resolve(anyString(), anyString()))
@@ -2263,9 +2256,9 @@ public class BigQueryIOTest implements Serializable {
BigQueryIO.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, tables.get(2))));
doThrow(new IOException("Unable to delete table"))
- .when(mockDatasetService).deleteTable(projectId, datasetId, tables.get(0));
- doNothing().when(mockDatasetService).deleteTable(projectId, datasetId, tables.get(1));
- doNothing().when(mockDatasetService).deleteTable(projectId, datasetId, tables.get(2));
+ .when(mockDatasetService).deleteTable(tableRefs.get(0));
+ doNothing().when(mockDatasetService).deleteTable(tableRefs.get(1));
+ doNothing().when(mockDatasetService).deleteTable(tableRefs.get(2));
WriteRename.removeTemporaryTables(mockDatasetService, tableRefs);
http://git-wip-us.apache.org/repos/asf/beam/blob/f9d1d682/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index 8130238..7b5b226 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -370,7 +370,8 @@ public class BigQueryUtilTest {
BigQueryServicesImpl.DatasetServiceImpl services =
new BigQueryServicesImpl.DatasetServiceImpl(mockClient, options);
- services.getTable("project", "dataset", "table");
+ services.getTable(
+ new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"));
verifyTableGet();
}