You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:14:51 UTC

[17/50] [abbrv] beam git commit: Refactor batch loads, and add support for windowed writes.

http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
new file mode 100644
index 0000000..9b2cf63
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
@@ -0,0 +1,172 @@
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.Dataset;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+
+/** A fake dataset service that can be serialized, for use in testReadFromTable. */
+class FakeDatasetService implements DatasetService, Serializable {
+  @Override
+  public Table getTable(TableReference tableRef)
+      throws InterruptedException, IOException {
+    synchronized (BigQueryIOTest.tables) {
+      Map<String, TableContainer> dataset =
+          checkNotNull(
+              BigQueryIOTest.tables.get(tableRef.getProjectId(), tableRef.getDatasetId()),
+              "Tried to get a dataset %s:%s from %s, but no such dataset was set",
+              tableRef.getProjectId(),
+              tableRef.getDatasetId(),
+              tableRef.getTableId(),
+              FakeDatasetService.class.getSimpleName());
+      TableContainer tableContainer = dataset.get(tableRef.getTableId());
+      return tableContainer == null ? null : tableContainer.getTable();
+    }
+  }
+
+  List<TableRow> getAllRows(String projectId, String datasetId, String tableId)
+      throws InterruptedException, IOException {
+    synchronized (BigQueryIOTest.tables) {
+      return getTableContainer(projectId, datasetId, tableId).getRows();
+    }
+  }
+
+  private TableContainer getTableContainer(String projectId, String datasetId, String tableId)
+          throws InterruptedException, IOException {
+     synchronized (BigQueryIOTest.tables) {
+       Map<String, TableContainer> dataset =
+           checkNotNull(
+               BigQueryIOTest.tables.get(projectId, datasetId),
+               "Tried to get a dataset %s:%s from %s, but no such dataset was set",
+               projectId,
+               datasetId,
+               FakeDatasetService.class.getSimpleName());
+       return checkNotNull(dataset.get(tableId),
+           "Tried to get a table %s:%s.%s from %s, but no such table was set",
+           projectId,
+           datasetId,
+           tableId,
+           FakeDatasetService.class.getSimpleName());
+     }
+  }
+
+  @Override
+  public void deleteTable(TableReference tableRef) throws IOException, InterruptedException {
+    throw new UnsupportedOperationException("Unsupported");
+  }
+
+
+  @Override
+  public void createTable(Table table) throws IOException {
+    TableReference tableReference = table.getTableReference();
+    synchronized (BigQueryIOTest.tables) {
+      Map<String, TableContainer> dataset =
+          checkNotNull(
+              BigQueryIOTest.tables.get(tableReference.getProjectId(),
+                  tableReference.getDatasetId()),
+              "Tried to get a dataset %s:%s from %s, but no such table was set",
+              tableReference.getProjectId(),
+              tableReference.getDatasetId(),
+              FakeDatasetService.class.getSimpleName());
+      TableContainer tableContainer = dataset.get(tableReference.getTableId());
+      if (tableContainer == null) {
+        tableContainer = new TableContainer(table);
+        dataset.put(tableReference.getTableId(), tableContainer);
+      }
+    }
+  }
+
+  @Override
+  public boolean isTableEmpty(TableReference tableRef)
+      throws IOException, InterruptedException {
+    Long numBytes = getTable(tableRef).getNumBytes();
+    return numBytes == null || numBytes == 0L;
+  }
+
+  @Override
+  public Dataset getDataset(
+      String projectId, String datasetId) throws IOException, InterruptedException {
+    throw new UnsupportedOperationException("Unsupported");
+  }
+
+  @Override
+  public void createDataset(
+      String projectId, String datasetId, String location, String description)
+      throws IOException, InterruptedException {
+    synchronized (BigQueryIOTest.tables) {
+      Map<String, TableContainer> dataset = BigQueryIOTest.tables.get(projectId, datasetId);
+      if (dataset == null) {
+        dataset = new HashMap<>();
+        BigQueryIOTest.tables.put(projectId, datasetId, dataset);
+      }
+    }
+  }
+
+  @Override
+  public void deleteDataset(String projectId, String datasetId)
+      throws IOException, InterruptedException {
+    throw new UnsupportedOperationException("Unsupported");
+  }
+
+  @Override
+  public long insertAll(
+      TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
+      throws IOException, InterruptedException {
+    synchronized (BigQueryIOTest.tables) {
+      if (insertIdList != null) {
+        assertEquals(rowList.size(), insertIdList.size());
+      } else {
+        insertIdList = Lists.newArrayListWithExpectedSize(rowList.size());
+        for (int i = 0; i < rowList.size(); ++i) {
+          insertIdList.add(Integer.toString(ThreadLocalRandom.current().nextInt()));
+        }
+      }
+
+      long dataSize = 0;
+      TableContainer tableContainer = getTableContainer(
+          ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
+      for (int i = 0; i < rowList.size(); ++i) {
+        tableContainer.addRow(rowList.get(i), insertIdList.get(i));
+        dataSize += rowList.get(i).toString().length();
+      }
+      return dataSize;
+    }
+  }
+
+  @Override
+  public Table patchTableDescription(TableReference tableReference,
+                                     @Nullable String tableDescription)
+      throws IOException, InterruptedException {
+    synchronized (BigQueryIOTest.tables) {
+      Map<String, TableContainer> dataset =
+          checkNotNull(
+              BigQueryIOTest.tables.get(tableReference.getProjectId(),
+                  tableReference.getDatasetId()),
+              "Tried to get a dataset %s:%s from %s, but no such dataset was set",
+              tableReference.getProjectId(),
+              tableReference.getDatasetId(),
+              tableReference.getTableId(),
+              FakeDatasetService.class.getSimpleName());
+      TableContainer tableContainer = checkNotNull(dataset.get(tableReference.getTableId()),
+          "Tried to patch a table %s:%s.%s from %s, but no such table was set",
+          tableReference.getProjectId(),
+          tableReference.getDatasetId(),
+          tableReference.getTableId(),
+          FakeDatasetService.class.getSimpleName());
+      tableContainer.getTable().setDescription(tableDescription);
+      return tableContainer.getTable();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
new file mode 100644
index 0000000..3c67c3d
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -0,0 +1,273 @@
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfiguration;
+import com.google.api.services.bigquery.model.JobConfigurationExtract;
+import com.google.api.services.bigquery.model.JobConfigurationLoad;
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatistics;
+import com.google.api.services.bigquery.model.JobStatistics4;
+import com.google.api.services.bigquery.model.JobStatus;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Lists;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.TableRowJsonCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
+import org.apache.beam.sdk.util.FluentBackoff;
+
+import org.apache.beam.sdk.util.Transport;
+import org.joda.time.Duration;
+
+/**
+ */
+class FakeJobService implements JobService, Serializable {
+  static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
+
+  // Whenever a job is started, the first 5 calls to GetJob will report the job as pending,
+  // the next 5 will return the job as running, and only then will the job report as done.
+  private static final int GET_JOBS_TRANSITION_INTERVAL = 5;
+
+  private FakeDatasetService datasetService;
+
+  private static class JobInfo {
+    Job job;
+    int getJobCount = 0;
+
+    JobInfo(Job job) {
+      this.job = job;
+    }
+  }
+
+  private static final com.google.common.collect.Table<String, String, JobInfo> allJobs =
+      HashBasedTable.create();
+
+  private static final com.google.common.collect.Table<String, String, JobStatistics>
+      dryRunQueryResults = HashBasedTable.create();
+
+  FakeJobService() {
+    this.datasetService = new FakeDatasetService();
+  }
+
+  @Override
+  public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
+      throws InterruptedException, IOException {
+    synchronized (allJobs) {
+      Job job = new Job();
+      job.setJobReference(jobRef);
+      job.setConfiguration(new JobConfiguration().setLoad(loadConfig));
+      job.setKind(" bigquery#job");
+      job.setStatus(new JobStatus().setState("PENDING"));
+      allJobs.put(jobRef.getProjectId(), jobRef.getJobId(), new JobInfo(job));
+    }
+  }
+
+  @Override
+  public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
+      throws InterruptedException, IOException {
+    checkArgument(extractConfig.getDestinationFormat().equals("AVRO"),
+        "Only extract to AVRO is supported");
+    checkArgument(extractConfig.getDestinationUris().size() == 1,
+        "Must specify exactly one destination URI.");
+    synchronized (allJobs) {
+      Job job = new Job();
+      job.setJobReference(jobRef);
+      job.setConfiguration(new JobConfiguration().setExtract(extractConfig));
+      job.setKind(" bigquery#job");
+      job.setStatus(new JobStatus().setState("PENDING"));
+      allJobs.put(jobRef.getProjectId(), jobRef.getJobId(), new JobInfo(job));
+    }
+  }
+
+  @Override
+  public void startQueryJob(JobReference jobRef, JobConfigurationQuery query)
+      throws IOException, InterruptedException {
+  }
+
+  @Override
+  public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig)
+      throws IOException, InterruptedException {
+    synchronized (allJobs) {
+      Job job = new Job();
+      job.setJobReference(jobRef);
+      job.setConfiguration(new JobConfiguration().setCopy(copyConfig));
+      job.setKind(" bigquery#job");
+      job.setStatus(new JobStatus().setState("PENDING"));
+      allJobs.put(jobRef.getProjectId(), jobRef.getJobId(), new JobInfo(job));
+    }
+  }
+
+  @Override
+  public Job pollJob(JobReference jobRef, int maxAttempts)
+      throws InterruptedException {
+    BackOff backoff =
+        FluentBackoff.DEFAULT
+            .withMaxRetries(maxAttempts)
+            .withInitialBackoff(Duration.millis(50))
+            .withMaxBackoff(Duration.standardMinutes(1))
+            .backoff();
+    Sleeper sleeper = Sleeper.DEFAULT;
+    try {
+      do {
+        Job job = getJob(jobRef);
+        if (job != null) {
+          JobStatus status = job.getStatus();
+          if (status != null && status.getState() != null && status.getState().equals("DONE")) {
+            return job;
+          }
+        }
+      } while (BackOffUtils.next(sleeper, backoff));
+    } catch (IOException e) {
+      return null;
+    }
+    return null;
+  }
+
+  public void expectDryRunQuery(String projectId, String query, JobStatistics result) {
+    synchronized (dryRunQueryResults) {
+      dryRunQueryResults.put(projectId, query, result);
+    }
+  }
+
+  @Override
+  public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery query)
+      throws InterruptedException, IOException {
+    synchronized (dryRunQueryResults) {
+      JobStatistics result = dryRunQueryResults.get(projectId, query.getQuery());
+      if (result != null) {
+        return result;
+      }
+    }
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Job getJob(JobReference jobRef) throws InterruptedException {
+    try {
+      synchronized (allJobs) {
+        JobInfo job = allJobs.get(jobRef.getProjectId(), jobRef.getJobId());
+        if (job == null) {
+          return null;
+        }
+        ++job.getJobCount;
+        if (job.getJobCount == GET_JOBS_TRANSITION_INTERVAL + 1) {
+          job.job.getStatus().setState("RUNNING");
+        } else if (job.getJobCount == 2 * GET_JOBS_TRANSITION_INTERVAL + 1) {
+          runJob(job.job);
+          job.job.getStatus().setState("DONE");
+        }
+        return JSON_FACTORY.fromString(JSON_FACTORY.toString(job.job), Job.class);
+      }
+    } catch (IOException e) {
+      return null;
+    }
+  }
+
+  private void runJob(Job job) throws InterruptedException, IOException {
+    if (job.getConfiguration().getLoad() != null) {
+      runLoadJob(job.getConfiguration().getLoad());
+    } else if (job.getConfiguration().getCopy() != null) {
+      runCopyJob(job.getConfiguration().getCopy());
+    } else if (job.getConfiguration().getExtract() != null) {
+      runExtractJob(job, job.getConfiguration().getExtract());
+    }
+  }
+
+  private void validateDispositions(Table table, CreateDisposition createDisposition,
+                                    WriteDisposition writeDisposition)
+      throws InterruptedException, IOException {
+    if (table == null) {
+      checkState(createDisposition != CreateDisposition.CREATE_NEVER,
+          "CreateDisposition == CREATE_NEVER but the table doesn't exist.");
+    } else if (writeDisposition == WriteDisposition.WRITE_TRUNCATE) {
+      datasetService.deleteTable(table.getTableReference());
+    } else if (writeDisposition == WriteDisposition.WRITE_EMPTY) {
+      List<TableRow> allRows = datasetService.getAllRows(table.getTableReference().getProjectId(),
+          table.getTableReference().getDatasetId(), table.getTableReference().getTableId());
+      checkState(allRows.isEmpty(), "Write disposition was set to WRITE_EMPTY,"
+          + " but the table was not empty.");
+    }
+  }
+  private void runLoadJob(JobConfigurationLoad load)
+      throws InterruptedException, IOException {
+    TableReference destination = load.getDestinationTable();
+    TableSchema schema = load.getSchema();
+    List<String> sourceFiles = load.getSourceUris();
+    WriteDisposition writeDisposition = WriteDisposition.valueOf(load.getWriteDisposition());
+    CreateDisposition createDisposition = CreateDisposition.valueOf(load.getCreateDisposition());
+    checkArgument(load.getSourceFormat().equals("NEWLINE_DELIMITED_JSON"));
+    Table existingTable = datasetService.getTable(destination);
+    validateDispositions(existingTable, createDisposition, writeDisposition);
+
+    datasetService.createTable(new Table().setTableReference(destination).setSchema(schema));
+
+    List<TableRow> rows = Lists.newArrayList();
+    for (String filename : sourceFiles) {
+      rows.addAll(readRows(filename));
+    }
+    datasetService.insertAll(destination, rows, null);
+  }
+
+  private void runCopyJob(JobConfigurationTableCopy copy)
+      throws InterruptedException, IOException {
+    List<TableReference> sources = copy.getSourceTables();
+    TableReference destination = copy.getDestinationTable();
+    WriteDisposition writeDisposition = WriteDisposition.valueOf(copy.getWriteDisposition());
+    CreateDisposition createDisposition = CreateDisposition.valueOf(copy.getCreateDisposition());
+    Table existingTable = datasetService.getTable(destination);
+    validateDispositions(existingTable, createDisposition, writeDisposition);
+
+    List<TableRow> allRows = Lists.newArrayList();
+    for (TableReference source : sources) {
+      allRows.addAll(datasetService.getAllRows(
+          source.getProjectId(), source.getDatasetId(), source.getTableId()));
+    }
+    datasetService.insertAll(destination, allRows, null);
+  }
+
+  private void runExtractJob(Job job, JobConfigurationExtract extract) {
+    TableReference sourceTable = extract.getSourceTable();
+    extract.getDestinationUris().get(0);
+    List<Long> destinationFileCounts = Lists.newArrayList(0L);
+    job.setStatistics(new JobStatistics().setExtract(
+        new JobStatistics4().setDestinationUriFileCounts(destinationFileCounts)));
+  }
+
+  private List<TableRow> readRows(String filename) throws IOException {
+    Coder<TableRow> coder = TableRowJsonCoder.of();
+    List<TableRow> tableRows = Lists.newArrayList();
+    try (BufferedReader reader = new BufferedReader(new FileReader(filename))) {
+      String line;
+      while ((line = reader.readLine()) != null) {
+        TableRow tableRow = coder.decode(
+            new ByteArrayInputStream(line.getBytes(StandardCharsets.UTF_8)), Context.OUTER);
+        tableRows.add(tableRow);
+      }
+    }
+    return tableRows;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java
new file mode 100644
index 0000000..b2fc170
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java
@@ -0,0 +1,36 @@
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableRow;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Created by relax on 3/30/17.
+ */
+class TableContainer {
+  Table table;
+  List<TableRow> rows;
+  List<String> ids;
+
+  TableContainer(Table table) {
+    this.table = table;
+    this.rows = new ArrayList<>();
+    this.ids = new ArrayList<>();
+  }
+
+  TableContainer addRow(TableRow row, String id) {
+    rows.add(row);
+    ids.add(id);
+    return this;
+  }
+
+  Table getTable() {
+    return table;
+  }
+
+  List<TableRow> getRows() {
+    return rows;
+  }
+}