You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:14:47 UTC

[13/50] [abbrv] beam git commit: Fix tests to properly fake out BigQueryService, and add tests for dynamic-table functionality.

http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index 3c67c3d..a2454fb 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -1,12 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
 
 import com.google.api.client.json.JsonFactory;
 import com.google.api.client.util.BackOff;
 import com.google.api.client.util.BackOffUtils;
 import com.google.api.client.util.Sleeper;
+import com.google.api.services.bigquery.model.ErrorProto;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfiguration;
 import com.google.api.services.bigquery.model.JobConfigurationExtract;
@@ -29,9 +47,18 @@ import java.io.ByteArrayInputStream;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
 
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.TableRowJsonCoder;
@@ -40,10 +67,13 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.util.FluentBackoff;
 
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.Transport;
 import org.joda.time.Duration;
 
 /**
+ * A fake implementation of BigQuery's job service.
  */
 class FakeJobService implements JobService, Serializable {
   static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
@@ -66,6 +96,8 @@ class FakeJobService implements JobService, Serializable {
   private static final com.google.common.collect.Table<String, String, JobInfo> allJobs =
       HashBasedTable.create();
 
+  private static final com.google.common.collect.Table<String, String, List<String>>
+      filesForLoadJobs = HashBasedTable.create();
   private static final com.google.common.collect.Table<String, String, JobStatistics>
       dryRunQueryResults = HashBasedTable.create();
 
@@ -82,6 +114,18 @@ class FakeJobService implements JobService, Serializable {
       job.setConfiguration(new JobConfiguration().setLoad(loadConfig));
       job.setKind(" bigquery#job");
       job.setStatus(new JobStatus().setState("PENDING"));
+
+      // Copy the files to a new location for import, as the temporary files will be deleted by
+      // the caller.
+      if (loadConfig.getSourceUris().size() > 0) {
+        List<String> loadFiles = Lists.newArrayList();
+        for (String filename : loadConfig.getSourceUris()) {
+          loadFiles.add(filename + ThreadLocalRandom.current().nextInt());
+        }
+        IOChannelUtils.getFactory(loadFiles.get(0)).copy(loadConfig.getSourceUris(), loadFiles);
+        filesForLoadJobs.put(jobRef.getProjectId(), jobRef.getJobId(), loadFiles);
+      }
+
       allJobs.put(jobRef.getProjectId(), jobRef.getJobId(), new JobInfo(job));
     }
   }
@@ -91,8 +135,6 @@ class FakeJobService implements JobService, Serializable {
       throws InterruptedException, IOException {
     checkArgument(extractConfig.getDestinationFormat().equals("AVRO"),
         "Only extract to AVRO is supported");
-    checkArgument(extractConfig.getDestinationUris().size() == 1,
-        "Must specify exactly one destination URI.");
     synchronized (allJobs) {
       Job job = new Job();
       job.setJobReference(jobRef);
@@ -106,6 +148,14 @@ class FakeJobService implements JobService, Serializable {
   @Override
   public void startQueryJob(JobReference jobRef, JobConfigurationQuery query)
       throws IOException, InterruptedException {
+    synchronized (allJobs) {
+      Job job = new Job();
+      job.setJobReference(jobRef);
+      job.setConfiguration(new JobConfiguration().setQuery(query));
+      job.setKind(" bigquery#job");
+      job.setStatus(new JobStatus().setState("PENDING"));
+      allJobs.put(jobRef.getProjectId(), jobRef.getJobId(), new JobInfo(job));
+    }
   }
 
   @Override
@@ -127,8 +177,8 @@ class FakeJobService implements JobService, Serializable {
     BackOff backoff =
         FluentBackoff.DEFAULT
             .withMaxRetries(maxAttempts)
-            .withInitialBackoff(Duration.millis(50))
-            .withMaxBackoff(Duration.standardMinutes(1))
+            .withInitialBackoff(Duration.millis(10))
+            .withMaxBackoff(Duration.standardSeconds(1))
             .backoff();
     Sleeper sleeper = Sleeper.DEFAULT;
     try {
@@ -136,7 +186,8 @@ class FakeJobService implements JobService, Serializable {
         Job job = getJob(jobRef);
         if (job != null) {
           JobStatus status = job.getStatus();
-          if (status != null && status.getState() != null && status.getState().equals("DONE")) {
+          if (status != null && status.getState() != null
+              && (status.getState().equals("DONE") || status.getState().equals("FAILED"))) {
             return job;
           }
         }
@@ -173,12 +224,15 @@ class FakeJobService implements JobService, Serializable {
         if (job == null) {
           return null;
         }
-        ++job.getJobCount;
-        if (job.getJobCount == GET_JOBS_TRANSITION_INTERVAL + 1) {
-          job.job.getStatus().setState("RUNNING");
-        } else if (job.getJobCount == 2 * GET_JOBS_TRANSITION_INTERVAL + 1) {
-          runJob(job.job);
-          job.job.getStatus().setState("DONE");
+        try {
+          ++job.getJobCount;
+          if (job.getJobCount == GET_JOBS_TRANSITION_INTERVAL + 1) {
+            job.job.getStatus().setState("RUNNING");
+          } else if (job.getJobCount == 2 * GET_JOBS_TRANSITION_INTERVAL + 1) {
+            job.job.setStatus(runJob(job.job));
+          }
+        } catch (Exception e) {
+          job.job.getStatus().setState("FAILED").setErrorResult(new ErrorProto());
         }
         return JSON_FACTORY.fromString(JSON_FACTORY.toString(job.job), Job.class);
       }
@@ -187,41 +241,50 @@ class FakeJobService implements JobService, Serializable {
     }
   }
 
-  private void runJob(Job job) throws InterruptedException, IOException {
+  private JobStatus runJob(Job job) throws InterruptedException, IOException {
     if (job.getConfiguration().getLoad() != null) {
-      runLoadJob(job.getConfiguration().getLoad());
+      return runLoadJob(job.getJobReference(), job.getConfiguration().getLoad());
     } else if (job.getConfiguration().getCopy() != null) {
-      runCopyJob(job.getConfiguration().getCopy());
+      return runCopyJob(job.getConfiguration().getCopy());
     } else if (job.getConfiguration().getExtract() != null) {
-      runExtractJob(job, job.getConfiguration().getExtract());
+      return runExtractJob(job, job.getConfiguration().getExtract());
+    } else if (job.getConfiguration().getQuery() != null) {
+      return runQueryJob(job.getConfiguration().getQuery());
     }
+    return new JobStatus().setState("DONE");
   }
 
-  private void validateDispositions(Table table, CreateDisposition createDisposition,
-                                    WriteDisposition writeDisposition)
+  private boolean validateDispositions(Table table, CreateDisposition createDisposition,
+                                       WriteDisposition writeDisposition)
       throws InterruptedException, IOException {
     if (table == null) {
-      checkState(createDisposition != CreateDisposition.CREATE_NEVER,
-          "CreateDisposition == CREATE_NEVER but the table doesn't exist.");
+      if (createDisposition == CreateDisposition.CREATE_NEVER) {
+        return false;
+      }
     } else if (writeDisposition == WriteDisposition.WRITE_TRUNCATE) {
       datasetService.deleteTable(table.getTableReference());
     } else if (writeDisposition == WriteDisposition.WRITE_EMPTY) {
       List<TableRow> allRows = datasetService.getAllRows(table.getTableReference().getProjectId(),
           table.getTableReference().getDatasetId(), table.getTableReference().getTableId());
-      checkState(allRows.isEmpty(), "Write disposition was set to WRITE_EMPTY,"
-          + " but the table was not empty.");
+      if (!allRows.isEmpty()) {
+        return false;
+      }
     }
+    return true;
   }
-  private void runLoadJob(JobConfigurationLoad load)
+
+  private JobStatus runLoadJob(JobReference jobRef, JobConfigurationLoad load)
       throws InterruptedException, IOException {
     TableReference destination = load.getDestinationTable();
     TableSchema schema = load.getSchema();
-    List<String> sourceFiles = load.getSourceUris();
+    List<String> sourceFiles = filesForLoadJobs.get(jobRef.getProjectId(), jobRef.getJobId());
     WriteDisposition writeDisposition = WriteDisposition.valueOf(load.getWriteDisposition());
     CreateDisposition createDisposition = CreateDisposition.valueOf(load.getCreateDisposition());
     checkArgument(load.getSourceFormat().equals("NEWLINE_DELIMITED_JSON"));
     Table existingTable = datasetService.getTable(destination);
-    validateDispositions(existingTable, createDisposition, writeDisposition);
+    if (!validateDispositions(existingTable, createDisposition, writeDisposition)) {
+      return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto());
+    }
 
     datasetService.createTable(new Table().setTableReference(destination).setSchema(schema));
 
@@ -230,31 +293,52 @@ class FakeJobService implements JobService, Serializable {
       rows.addAll(readRows(filename));
     }
     datasetService.insertAll(destination, rows, null);
+    return new JobStatus().setState("DONE");
   }
 
-  private void runCopyJob(JobConfigurationTableCopy copy)
+  private JobStatus runCopyJob(JobConfigurationTableCopy copy)
       throws InterruptedException, IOException {
     List<TableReference> sources = copy.getSourceTables();
     TableReference destination = copy.getDestinationTable();
     WriteDisposition writeDisposition = WriteDisposition.valueOf(copy.getWriteDisposition());
     CreateDisposition createDisposition = CreateDisposition.valueOf(copy.getCreateDisposition());
     Table existingTable = datasetService.getTable(destination);
-    validateDispositions(existingTable, createDisposition, writeDisposition);
+    if (!validateDispositions(existingTable, createDisposition, writeDisposition)) {
+      return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto());
+    }
 
     List<TableRow> allRows = Lists.newArrayList();
     for (TableReference source : sources) {
       allRows.addAll(datasetService.getAllRows(
           source.getProjectId(), source.getDatasetId(), source.getTableId()));
     }
+    datasetService.createTable(new Table().setTableReference(destination));
     datasetService.insertAll(destination, allRows, null);
+    return new JobStatus().setState("DONE");
   }
 
-  private void runExtractJob(Job job, JobConfigurationExtract extract) {
+  private JobStatus runExtractJob(Job job, JobConfigurationExtract extract)
+      throws InterruptedException, IOException {
     TableReference sourceTable = extract.getSourceTable();
-    extract.getDestinationUris().get(0);
-    List<Long> destinationFileCounts = Lists.newArrayList(0L);
+
+    List<TableRow> rows = datasetService.getAllRows(
+        sourceTable.getProjectId(), sourceTable.getDatasetId(), sourceTable.getTableId());
+    TableSchema schema = datasetService.getTable(sourceTable).getSchema();
+    List<Long> destinationFileCounts = Lists.newArrayList();
+    for (String destination : extract.getDestinationUris()) {
+      destinationFileCounts.add(writeRows(sourceTable.getTableId(), rows, schema, destination));
+    }
     job.setStatistics(new JobStatistics().setExtract(
         new JobStatistics4().setDestinationUriFileCounts(destinationFileCounts)));
+    return new JobStatus().setState("DONE");
+  }
+
+  private JobStatus runQueryJob(JobConfigurationQuery query)
+      throws IOException, InterruptedException  {
+    List<TableRow> rows = FakeBigQueryServices.rowsFromEncodedQuery(query.getQuery());
+    datasetService.createTable(new Table().setTableReference(query.getDestinationTable()));
+    datasetService.insertAll(query.getDestinationTable(), rows, null);
+    return new JobStatus().setState("DONE");
   }
 
   private List<TableRow> readRows(String filename) throws IOException {
@@ -270,4 +354,42 @@ class FakeJobService implements JobService, Serializable {
     }
     return tableRows;
   }
+
+  private long writeRows(String tableId, List<TableRow> rows, TableSchema schema,
+                         String destinationPattern) throws IOException {
+    Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableId, schema.getFields());
+    List<TableRow> rowsToWrite = Lists.newArrayList();
+    int shard = 0;
+    for (int i = 0; i < rows.size(); ++i) {
+      rowsToWrite.add(rows.get(i));
+      if (rowsToWrite.size() == 5) {
+        writeRowsHelper(rowsToWrite, avroSchema, destinationPattern, shard++);
+        rowsToWrite.clear();
+      }
+    }
+    if (!rowsToWrite.isEmpty()) {
+      writeRowsHelper(rowsToWrite, avroSchema, destinationPattern, shard++);
+    }
+    return shard;
+  }
+
+  private void writeRowsHelper(List<TableRow> rows, Schema avroSchema,
+                               String destinationPattern, int shard) throws IOException {
+    String filename = destinationPattern.replace("*", String.format("%012d", shard));
+    try (WritableByteChannel channel = IOChannelUtils.create(filename, MimeTypes.BINARY);
+         DataFileWriter<GenericRecord> tableRowWriter =
+             new DataFileWriter<>(new GenericDatumWriter<GenericRecord>(avroSchema))
+                 .create(avroSchema, Channels.newOutputStream(channel))) {
+      for (Map<String, Object> record : rows) {
+        GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(avroSchema);
+        for (Map.Entry<String, Object> field : record.entrySet()) {
+          genericRecordBuilder.set(field.getKey(), field.getValue());
+        }
+        tableRowWriter.append(genericRecordBuilder.build());
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          String.format("Could not create destination for extract job %s", filename), e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b486137d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java
index b2fc170..d52723b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.Table;
@@ -7,23 +24,31 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Created by relax on 3/30/17.
+ * Encapsulates a BigQuery Table, and it's contents.
  */
 class TableContainer {
   Table table;
   List<TableRow> rows;
   List<String> ids;
-
+  Long sizeBytes;
   TableContainer(Table table) {
     this.table = table;
     this.rows = new ArrayList<>();
     this.ids = new ArrayList<>();
+    this.sizeBytes = 0L;
   }
 
-  TableContainer addRow(TableRow row, String id) {
+  long addRow(TableRow row, String id) {
     rows.add(row);
     ids.add(id);
-    return this;
+    long rowSize =  row.toString().length();
+    Long tableSize = table.getNumBytes();
+    if (tableSize == null) {
+      table.setNumBytes(rowSize);
+    } else {
+      table.setNumBytes(tableSize + rowSize);
+    }
+    return rowSize;
   }
 
   Table getTable() {