You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:14:47 UTC
[13/50] [abbrv] beam git commit: Fix tests to properly fake out
BigQueryService, and add tests for dynamic-table functionality.
http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index 3c67c3d..a2454fb 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -1,12 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.beam.sdk.io.gcp.bigquery;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.Sleeper;
+import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.JobConfigurationExtract;
@@ -29,9 +47,18 @@ import java.io.ByteArrayInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.TableRowJsonCoder;
@@ -40,10 +67,13 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.Transport;
import org.joda.time.Duration;
/**
+ * A fake implementation of BigQuery's job service.
*/
class FakeJobService implements JobService, Serializable {
static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
@@ -66,6 +96,8 @@ class FakeJobService implements JobService, Serializable {
private static final com.google.common.collect.Table<String, String, JobInfo> allJobs =
HashBasedTable.create();
+ private static final com.google.common.collect.Table<String, String, List<String>>
+ filesForLoadJobs = HashBasedTable.create();
private static final com.google.common.collect.Table<String, String, JobStatistics>
dryRunQueryResults = HashBasedTable.create();
@@ -82,6 +114,18 @@ class FakeJobService implements JobService, Serializable {
job.setConfiguration(new JobConfiguration().setLoad(loadConfig));
job.setKind(" bigquery#job");
job.setStatus(new JobStatus().setState("PENDING"));
+
+ // Copy the files to a new location for import, as the temporary files will be deleted by
+ // the caller.
+ if (loadConfig.getSourceUris().size() > 0) {
+ List<String> loadFiles = Lists.newArrayList();
+ for (String filename : loadConfig.getSourceUris()) {
+ loadFiles.add(filename + ThreadLocalRandom.current().nextInt());
+ }
+ IOChannelUtils.getFactory(loadFiles.get(0)).copy(loadConfig.getSourceUris(), loadFiles);
+ filesForLoadJobs.put(jobRef.getProjectId(), jobRef.getJobId(), loadFiles);
+ }
+
allJobs.put(jobRef.getProjectId(), jobRef.getJobId(), new JobInfo(job));
}
}
@@ -91,8 +135,6 @@ class FakeJobService implements JobService, Serializable {
throws InterruptedException, IOException {
checkArgument(extractConfig.getDestinationFormat().equals("AVRO"),
"Only extract to AVRO is supported");
- checkArgument(extractConfig.getDestinationUris().size() == 1,
- "Must specify exactly one destination URI.");
synchronized (allJobs) {
Job job = new Job();
job.setJobReference(jobRef);
@@ -106,6 +148,14 @@ class FakeJobService implements JobService, Serializable {
@Override
public void startQueryJob(JobReference jobRef, JobConfigurationQuery query)
throws IOException, InterruptedException {
+ synchronized (allJobs) {
+ Job job = new Job();
+ job.setJobReference(jobRef);
+ job.setConfiguration(new JobConfiguration().setQuery(query));
+ job.setKind(" bigquery#job");
+ job.setStatus(new JobStatus().setState("PENDING"));
+ allJobs.put(jobRef.getProjectId(), jobRef.getJobId(), new JobInfo(job));
+ }
}
@Override
@@ -127,8 +177,8 @@ class FakeJobService implements JobService, Serializable {
BackOff backoff =
FluentBackoff.DEFAULT
.withMaxRetries(maxAttempts)
- .withInitialBackoff(Duration.millis(50))
- .withMaxBackoff(Duration.standardMinutes(1))
+ .withInitialBackoff(Duration.millis(10))
+ .withMaxBackoff(Duration.standardSeconds(1))
.backoff();
Sleeper sleeper = Sleeper.DEFAULT;
try {
@@ -136,7 +186,8 @@ class FakeJobService implements JobService, Serializable {
Job job = getJob(jobRef);
if (job != null) {
JobStatus status = job.getStatus();
- if (status != null && status.getState() != null && status.getState().equals("DONE")) {
+ if (status != null && status.getState() != null
+ && (status.getState().equals("DONE") || status.getState().equals("FAILED"))) {
return job;
}
}
@@ -173,12 +224,15 @@ class FakeJobService implements JobService, Serializable {
if (job == null) {
return null;
}
- ++job.getJobCount;
- if (job.getJobCount == GET_JOBS_TRANSITION_INTERVAL + 1) {
- job.job.getStatus().setState("RUNNING");
- } else if (job.getJobCount == 2 * GET_JOBS_TRANSITION_INTERVAL + 1) {
- runJob(job.job);
- job.job.getStatus().setState("DONE");
+ try {
+ ++job.getJobCount;
+ if (job.getJobCount == GET_JOBS_TRANSITION_INTERVAL + 1) {
+ job.job.getStatus().setState("RUNNING");
+ } else if (job.getJobCount == 2 * GET_JOBS_TRANSITION_INTERVAL + 1) {
+ job.job.setStatus(runJob(job.job));
+ }
+ } catch (Exception e) {
+ job.job.getStatus().setState("FAILED").setErrorResult(new ErrorProto());
}
return JSON_FACTORY.fromString(JSON_FACTORY.toString(job.job), Job.class);
}
@@ -187,41 +241,50 @@ class FakeJobService implements JobService, Serializable {
}
}
- private void runJob(Job job) throws InterruptedException, IOException {
+ private JobStatus runJob(Job job) throws InterruptedException, IOException {
if (job.getConfiguration().getLoad() != null) {
- runLoadJob(job.getConfiguration().getLoad());
+ return runLoadJob(job.getJobReference(), job.getConfiguration().getLoad());
} else if (job.getConfiguration().getCopy() != null) {
- runCopyJob(job.getConfiguration().getCopy());
+ return runCopyJob(job.getConfiguration().getCopy());
} else if (job.getConfiguration().getExtract() != null) {
- runExtractJob(job, job.getConfiguration().getExtract());
+ return runExtractJob(job, job.getConfiguration().getExtract());
+ } else if (job.getConfiguration().getQuery() != null) {
+ return runQueryJob(job.getConfiguration().getQuery());
}
+ return new JobStatus().setState("DONE");
}
- private void validateDispositions(Table table, CreateDisposition createDisposition,
- WriteDisposition writeDisposition)
+ private boolean validateDispositions(Table table, CreateDisposition createDisposition,
+ WriteDisposition writeDisposition)
throws InterruptedException, IOException {
if (table == null) {
- checkState(createDisposition != CreateDisposition.CREATE_NEVER,
- "CreateDisposition == CREATE_NEVER but the table doesn't exist.");
+ if (createDisposition == CreateDisposition.CREATE_NEVER) {
+ return false;
+ }
} else if (writeDisposition == WriteDisposition.WRITE_TRUNCATE) {
datasetService.deleteTable(table.getTableReference());
} else if (writeDisposition == WriteDisposition.WRITE_EMPTY) {
List<TableRow> allRows = datasetService.getAllRows(table.getTableReference().getProjectId(),
table.getTableReference().getDatasetId(), table.getTableReference().getTableId());
- checkState(allRows.isEmpty(), "Write disposition was set to WRITE_EMPTY,"
- + " but the table was not empty.");
+ if (!allRows.isEmpty()) {
+ return false;
+ }
}
+ return true;
}
- private void runLoadJob(JobConfigurationLoad load)
+
+ private JobStatus runLoadJob(JobReference jobRef, JobConfigurationLoad load)
throws InterruptedException, IOException {
TableReference destination = load.getDestinationTable();
TableSchema schema = load.getSchema();
- List<String> sourceFiles = load.getSourceUris();
+ List<String> sourceFiles = filesForLoadJobs.get(jobRef.getProjectId(), jobRef.getJobId());
WriteDisposition writeDisposition = WriteDisposition.valueOf(load.getWriteDisposition());
CreateDisposition createDisposition = CreateDisposition.valueOf(load.getCreateDisposition());
checkArgument(load.getSourceFormat().equals("NEWLINE_DELIMITED_JSON"));
Table existingTable = datasetService.getTable(destination);
- validateDispositions(existingTable, createDisposition, writeDisposition);
+ if (!validateDispositions(existingTable, createDisposition, writeDisposition)) {
+ return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto());
+ }
datasetService.createTable(new Table().setTableReference(destination).setSchema(schema));
@@ -230,31 +293,52 @@ class FakeJobService implements JobService, Serializable {
rows.addAll(readRows(filename));
}
datasetService.insertAll(destination, rows, null);
+ return new JobStatus().setState("DONE");
}
- private void runCopyJob(JobConfigurationTableCopy copy)
+ private JobStatus runCopyJob(JobConfigurationTableCopy copy)
throws InterruptedException, IOException {
List<TableReference> sources = copy.getSourceTables();
TableReference destination = copy.getDestinationTable();
WriteDisposition writeDisposition = WriteDisposition.valueOf(copy.getWriteDisposition());
CreateDisposition createDisposition = CreateDisposition.valueOf(copy.getCreateDisposition());
Table existingTable = datasetService.getTable(destination);
- validateDispositions(existingTable, createDisposition, writeDisposition);
+ if (!validateDispositions(existingTable, createDisposition, writeDisposition)) {
+ return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto());
+ }
List<TableRow> allRows = Lists.newArrayList();
for (TableReference source : sources) {
allRows.addAll(datasetService.getAllRows(
source.getProjectId(), source.getDatasetId(), source.getTableId()));
}
+ datasetService.createTable(new Table().setTableReference(destination));
datasetService.insertAll(destination, allRows, null);
+ return new JobStatus().setState("DONE");
}
- private void runExtractJob(Job job, JobConfigurationExtract extract) {
+ private JobStatus runExtractJob(Job job, JobConfigurationExtract extract)
+ throws InterruptedException, IOException {
TableReference sourceTable = extract.getSourceTable();
- extract.getDestinationUris().get(0);
- List<Long> destinationFileCounts = Lists.newArrayList(0L);
+
+ List<TableRow> rows = datasetService.getAllRows(
+ sourceTable.getProjectId(), sourceTable.getDatasetId(), sourceTable.getTableId());
+ TableSchema schema = datasetService.getTable(sourceTable).getSchema();
+ List<Long> destinationFileCounts = Lists.newArrayList();
+ for (String destination : extract.getDestinationUris()) {
+ destinationFileCounts.add(writeRows(sourceTable.getTableId(), rows, schema, destination));
+ }
job.setStatistics(new JobStatistics().setExtract(
new JobStatistics4().setDestinationUriFileCounts(destinationFileCounts)));
+ return new JobStatus().setState("DONE");
+ }
+
+ private JobStatus runQueryJob(JobConfigurationQuery query)
+ throws IOException, InterruptedException {
+ List<TableRow> rows = FakeBigQueryServices.rowsFromEncodedQuery(query.getQuery());
+ datasetService.createTable(new Table().setTableReference(query.getDestinationTable()));
+ datasetService.insertAll(query.getDestinationTable(), rows, null);
+ return new JobStatus().setState("DONE");
}
private List<TableRow> readRows(String filename) throws IOException {
@@ -270,4 +354,42 @@ class FakeJobService implements JobService, Serializable {
}
return tableRows;
}
+
+ private long writeRows(String tableId, List<TableRow> rows, TableSchema schema,
+ String destinationPattern) throws IOException {
+ Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableId, schema.getFields());
+ List<TableRow> rowsToWrite = Lists.newArrayList();
+ int shard = 0;
+ for (int i = 0; i < rows.size(); ++i) {
+ rowsToWrite.add(rows.get(i));
+ if (rowsToWrite.size() == 5) {
+ writeRowsHelper(rowsToWrite, avroSchema, destinationPattern, shard++);
+ rowsToWrite.clear();
+ }
+ }
+ if (!rowsToWrite.isEmpty()) {
+ writeRowsHelper(rowsToWrite, avroSchema, destinationPattern, shard++);
+ }
+ return shard;
+ }
+
+ private void writeRowsHelper(List<TableRow> rows, Schema avroSchema,
+ String destinationPattern, int shard) throws IOException {
+ String filename = destinationPattern.replace("*", String.format("%012d", shard));
+ try (WritableByteChannel channel = IOChannelUtils.create(filename, MimeTypes.BINARY);
+ DataFileWriter<GenericRecord> tableRowWriter =
+ new DataFileWriter<>(new GenericDatumWriter<GenericRecord>(avroSchema))
+ .create(avroSchema, Channels.newOutputStream(channel))) {
+ for (Map<String, Object> record : rows) {
+ GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(avroSchema);
+ for (Map.Entry<String, Object> field : record.entrySet()) {
+ genericRecordBuilder.set(field.getKey(), field.getValue());
+ }
+ tableRowWriter.append(genericRecordBuilder.build());
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ String.format("Could not create destination for extract job %s", filename), e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java
index b2fc170..d52723b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.api.services.bigquery.model.Table;
@@ -7,23 +24,31 @@ import java.util.ArrayList;
import java.util.List;
/**
- * Created by relax on 3/30/17.
+ * Encapsulates a BigQuery Table, and it's contents.
*/
class TableContainer {
Table table;
List<TableRow> rows;
List<String> ids;
-
+ Long sizeBytes;
TableContainer(Table table) {
this.table = table;
this.rows = new ArrayList<>();
this.ids = new ArrayList<>();
+ this.sizeBytes = 0L;
}
- TableContainer addRow(TableRow row, String id) {
+ long addRow(TableRow row, String id) {
rows.add(row);
ids.add(id);
- return this;
+ long rowSize = row.toString().length();
+ Long tableSize = table.getNumBytes();
+ if (tableSize == null) {
+ table.setNumBytes(rowSize);
+ } else {
+ table.setNumBytes(tableSize + rowSize);
+ }
+ return rowSize;
}
Table getTable() {