You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/29 21:26:17 UTC

[1/2] beam git commit: Add support for TimePartitioning in BigQueryIO.write().

Repository: beam
Updated Branches:
  refs/heads/master 1c26b7488 -> 6280d497b


Add support for TimePartitioning in BigQueryIO.write().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b0e03a33
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b0e03a33
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b0e03a33

Branch: refs/heads/master
Commit: b0e03a33cf0c2c573a2d34d88506e19ebb28c934
Parents: 1c26b74
Author: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com>
Authored: Sun Jul 30 21:42:59 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Aug 29 14:05:15 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |  2 +-
 ...aultCoderCloudObjectTranslatorRegistrar.java |  2 +
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    |  4 +-
 .../sdk/io/gcp/bigquery/BigQueryHelpers.java    |  8 +++
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 47 ++++++++++++++++
 .../beam/sdk/io/gcp/bigquery/CreateTables.java  | 15 +++--
 .../bigquery/DynamicDestinationsHelpers.java    | 27 ++++++++-
 .../sdk/io/gcp/bigquery/TableDestination.java   | 39 ++++++++++++-
 .../io/gcp/bigquery/TableDestinationCoder.java  |  2 +
 .../gcp/bigquery/TableDestinationCoderV2.java   | 59 ++++++++++++++++++++
 .../beam/sdk/io/gcp/bigquery/WriteTables.java   |  7 ++-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 50 +++++++++++++++++
 .../sdk/io/gcp/bigquery/FakeJobService.java     | 32 +++++++++--
 .../sdk/io/gcp/bigquery/TableContainer.java     |  2 +
 14 files changed, 278 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 81c8003..b563f8c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,7 +107,7 @@
     <apex.kryo.version>2.24.0</apex.kryo.version>
     <api-common.version>1.0.0-rc2</api-common.version>
     <avro.version>1.8.2</avro.version>
-    <bigquery.version>v2-rev295-1.22.0</bigquery.version>
+    <bigquery.version>v2-rev355-1.22.0</bigquery.version>
     <bigtable.version>0.9.7.1</bigtable.version>
     <cloudresourcemanager.version>v1-rev6-1.22.0</cloudresourcemanager.version>
     <pubsubgrpc.version>0.1.0</pubsubgrpc.version>

http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
index 5d42a5f..ff89933 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
@@ -48,6 +48,7 @@ import org.apache.beam.sdk.coders.TextualIntegerCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoderV2;
 import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
 
 /**
@@ -97,6 +98,7 @@ public class DefaultCoderCloudObjectTranslatorRegistrar
           RandomAccessDataCoder.class,
           StringUtf8Coder.class,
           TableDestinationCoder.class,
+          TableDestinationCoderV2.class,
           TableRowJsonCoder.class,
           TextualIntegerCoder.class,
           VarIntCoder.class,

http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 0a1306d..76cf7e8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -266,7 +266,7 @@ class BatchLoads<DestinationT>
         .apply(WithKeys.<Void, KV<TableDestination, String>>of((Void) null))
         .setCoder(
             KvCoder.of(
-                VoidCoder.of(), KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of())))
+                VoidCoder.of(), KvCoder.of(TableDestinationCoderV2.of(), StringUtf8Coder.of())))
         .apply(GroupByKey.<Void, KV<TableDestination, String>>create())
         .apply(Values.<Iterable<KV<TableDestination, String>>>create())
         .apply(
@@ -323,7 +323,7 @@ class BatchLoads<DestinationT>
 
     tempTables
         .apply("ReifyRenameInput", new ReifyAsIterable<KV<TableDestination, String>>())
-        .setCoder(IterableCoder.of(KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of())))
+        .setCoder(IterableCoder.of(KvCoder.of(TableDestinationCoderV2.of(), StringUtf8Coder.of())))
         .apply(
             "WriteRenameUntriggered",
             ParDo.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 78dcdde..7f9e27a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -24,6 +24,7 @@ import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.api.services.bigquery.model.TimePartitioning;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.hash.Hashing;
@@ -291,6 +292,13 @@ public class BigQueryHelpers {
     }
   }
 
+  static class TimePartitioningToJson implements SerializableFunction<TimePartitioning, String> {
+    @Override
+    public String apply(TimePartitioning partitioning) {
+      return toJsonString(partitioning);
+    }
+  }
+
   static String createJobIdToken(String jobName, String stepUuid) {
     return String.format("beam_job_%s_%s", stepUuid, jobName.replaceAll("-", ""));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index feb085d..29828e4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -31,6 +31,7 @@ import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.api.services.bigquery.model.TimePartitioning;
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicates;
@@ -60,9 +61,11 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRe
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TimePartitioningToJson;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantSchemaDestinations;
+import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantTimePartitioningDestinations;
 import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.SchemaFromViewDestinations;
 import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.TableFunctionDestinations;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -824,6 +827,7 @@ public class BigQueryIO {
     @Nullable abstract DynamicDestinations<T, ?> getDynamicDestinations();
     @Nullable abstract PCollectionView<Map<String, String>> getSchemaFromView();
     @Nullable abstract ValueProvider<String> getJsonSchema();
+    @Nullable abstract ValueProvider<String> getJsonTimePartitioning();
     abstract CreateDisposition getCreateDisposition();
     abstract WriteDisposition getWriteDisposition();
     /** Table description. Default is empty. */
@@ -854,6 +858,7 @@ public class BigQueryIO {
       abstract Builder<T> setDynamicDestinations(DynamicDestinations<T, ?> dynamicDestinations);
       abstract Builder<T> setSchemaFromView(PCollectionView<Map<String, String>> view);
       abstract Builder<T> setJsonSchema(ValueProvider<String> jsonSchema);
+      abstract Builder<T> setJsonTimePartitioning(ValueProvider<String> jsonTimePartitioning);
       abstract Builder<T> setCreateDisposition(CreateDisposition createDisposition);
       abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
       abstract Builder<T> setTableDescription(String tableDescription);
@@ -1022,6 +1027,33 @@ public class BigQueryIO {
       return toBuilder().setSchemaFromView(view).build();
     }
 
+    /**
+     * Allows newly created tables to include a {@link TimePartitioning} class. Can only be used
+     * when writing to a single table. If {@link #to(SerializableFunction)} or
+     * {@link #to(DynamicDestinations)} is used to write dynamic tables, time partitioning can be
+     * directly in the returned {@link TableDestination}.
+     */
+    public Write<T> withTimePartitioning(TimePartitioning partitioning) {
+      return withJsonTimePartitioning(
+          StaticValueProvider.of(BigQueryHelpers.toJsonString(partitioning)));
+    }
+
+    /**
+     * Like {@link #withTimePartitioning(TimePartitioning)} but using a deferred
+     * {@link ValueProvider}.
+     */
+    public Write<T> withTimePartitioning(ValueProvider<TimePartitioning> partition) {
+      return withJsonTimePartitioning(NestedValueProvider.of(
+          partition, new TimePartitioningToJson()));
+    }
+
+    /**
+     * The same as {@link #withTimePartitioning}, but takes a JSON-serialized object.
+     */
+    public Write<T> withJsonTimePartitioning(ValueProvider<String> partition) {
+      return toBuilder().setJsonTimePartitioning(partition).build();
+    }
+
     /** Specifies whether the table should be created if it does not exist. */
     public Write<T> withCreateDisposition(CreateDisposition createDisposition) {
       return toBuilder().setCreateDisposition(createDisposition).build();
@@ -1183,6 +1215,15 @@ public class BigQueryIO {
             input.isBounded(),
             method);
       }
+      if (getJsonTimePartitioning() != null) {
+        checkArgument(getDynamicDestinations() == null,
+            "The supplied DynamicDestinations object can directly set TimePartitioning."
+                + " There is no need to call BigQueryIO.Write.withTimePartitioning.");
+        checkArgument(getTableFunction() == null,
+            "The supplied getTableFunction object can directly set TimePartitioning."
+                + " There is no need to call BigQueryIO.Write.withTimePartitioning.");
+      }
+
       DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations();
       if (dynamicDestinations == null) {
         if (getJsonTableRef() != null) {
@@ -1205,6 +1246,12 @@ public class BigQueryIO {
                   (DynamicDestinations<T, TableDestination>) dynamicDestinations,
                   getSchemaFromView());
         }
+
+        // Wrap with a DynamicDestinations class that will provide the proper TimePartitioning.
+        if (getJsonTimePartitioning() != null) {
+          dynamicDestinations = new ConstantTimePartitioningDestinations(
+              dynamicDestinations, getJsonTimePartitioning());
+        }
       }
       return expandTyped(input, dynamicDestinations);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
index 3dc10b0..7f83b83 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
@@ -73,7 +73,7 @@ public class CreateTables<DestinationT>
   }
 
   CreateTables<DestinationT> withTestServices(BigQueryServices bqServices) {
-    return new CreateTables<DestinationT>(createDisposition, bqServices, dynamicDestinations);
+    return new CreateTables<>(createDisposition, bqServices, dynamicDestinations);
   }
 
   @Override
@@ -124,11 +124,14 @@ public class CreateTables<DestinationT>
         DatasetService datasetService = bqServices.getDatasetService(options);
         if (!createdTables.contains(tableSpec)) {
           if (datasetService.getTable(tableReference) == null) {
-            datasetService.createTable(
-                new Table()
-                    .setTableReference(tableReference)
-                    .setSchema(tableSchema)
-                    .setDescription(tableDescription));
+            Table table = new Table()
+                .setTableReference(tableReference)
+                .setSchema(tableSchema)
+                .setDescription(tableDescription);
+            if (tableDestination.getTimePartitioning() != null) {
+              table.setTimePartitioning(tableDestination.getTimePartitioning());
+            }
+            datasetService.createTable(table);
           }
           createdTables.add(tableSpec);
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
index 530e2b6..818ea34 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
@@ -108,7 +108,7 @@ class DynamicDestinationsHelpers {
 
     @Override
     public Coder<TableDestination> getDestinationCoder() {
-      return TableDestinationCoder.of();
+      return TableDestinationCoderV2.of();
     }
   }
 
@@ -164,6 +164,31 @@ class DynamicDestinationsHelpers {
     }
   }
 
+  static class ConstantTimePartitioningDestinations<T>
+      extends DelegatingDynamicDestinations<T, TableDestination> {
+
+    @Nullable
+    private final ValueProvider<String> jsonTimePartitioning;
+
+    ConstantTimePartitioningDestinations(DynamicDestinations<T, TableDestination> inner,
+        ValueProvider<String> jsonTimePartitioning) {
+      super(inner);
+      this.jsonTimePartitioning = jsonTimePartitioning;
+    }
+
+    @Override
+    public TableDestination getDestination(ValueInSingleWindow<T> element) {
+      TableDestination destination = super.getDestination(element);
+      return new TableDestination(destination.getTableSpec(), destination.getTableDescription(),
+          jsonTimePartitioning.get());
+    }
+
+    @Override
+    public Coder<TableDestination> getDestinationCoder() {
+      return TableDestinationCoderV2.of();
+    }
+  }
+
   /**
    * Takes in a side input mapping tablespec to json table schema, and always returns the
    * matching schema from the side input.

http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
index ecf35d8..79f1b22 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
@@ -19,6 +19,7 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TimePartitioning;
 import java.io.Serializable;
 import java.util.Objects;
 import javax.annotation.Nullable;
@@ -31,18 +32,38 @@ public class TableDestination implements Serializable {
   private final String tableSpec;
   @Nullable
   private final String tableDescription;
+  @Nullable
+  private final String jsonTimePartitioning;
 
 
   public TableDestination(String tableSpec, @Nullable String tableDescription) {
-    this.tableSpec = tableSpec;
-    this.tableDescription = tableDescription;
+    this(tableSpec, tableDescription, (String) null);
   }
 
   public TableDestination(TableReference tableReference, @Nullable String tableDescription) {
-    this.tableSpec = BigQueryHelpers.toTableSpec(tableReference);
+    this(tableReference, tableDescription, null);
+  }
+
+  public TableDestination(TableReference tableReference, @Nullable String tableDescription,
+      TimePartitioning timePartitioning) {
+    this(BigQueryHelpers.toTableSpec(tableReference), tableDescription,
+        timePartitioning != null ? BigQueryHelpers.toJsonString(timePartitioning) : null);
+  }
+
+  public TableDestination(String tableSpec, @Nullable String tableDescription,
+      TimePartitioning timePartitioning) {
+    this(tableSpec, tableDescription,
+        timePartitioning != null ? BigQueryHelpers.toJsonString(timePartitioning) : null);
+  }
+
+  public TableDestination(String tableSpec, @Nullable String tableDescription,
+      @Nullable String jsonTimePartitioning) {
+    this.tableSpec = tableSpec;
     this.tableDescription = tableDescription;
+    this.jsonTimePartitioning = jsonTimePartitioning;
   }
 
+
   public String getTableSpec() {
     return tableSpec;
   }
@@ -51,6 +72,18 @@ public class TableDestination implements Serializable {
     return BigQueryHelpers.parseTableSpec(tableSpec);
   }
 
+  public String getJsonTimePartitioning() {
+    return jsonTimePartitioning;
+  }
+
+  public TimePartitioning getTimePartitioning() {
+    if (jsonTimePartitioning == null) {
+      return null;
+    } else {
+      return BigQueryHelpers.fromJsonString(jsonTimePartitioning, TimePartitioning.class);
+    }
+  }
+
   @Nullable
   public String getTableDescription() {
     return tableDescription;

http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
index f034a03..2bfc2ca 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
@@ -33,6 +33,8 @@ public class TableDestinationCoder extends AtomicCoder<TableDestination> {
   private static final Coder<String> tableSpecCoder = StringUtf8Coder.of();
   private static final Coder<String> tableDescriptionCoder = NullableCoder.of(StringUtf8Coder.of());
 
+  private TableDestinationCoder() {}
+
   public static TableDestinationCoder of() {
     return INSTANCE;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoderV2.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoderV2.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoderV2.java
new file mode 100644
index 0000000..5bdab0d
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoderV2.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+/**
+ * A {@link Coder} for {@link TableDestination} that includes time partitioning information. This
+ * is a new coder (instead of extending the old {@link TableDestinationCoder}) for compatibility
+ * reasons. The old coder is kept around for the same compatibility reasons.
+ */
+public class TableDestinationCoderV2 extends AtomicCoder<TableDestination> {
+  private static final TableDestinationCoderV2 INSTANCE = new TableDestinationCoderV2();
+  private static final Coder<String> timePartitioningCoder = NullableCoder.of(StringUtf8Coder.of());
+
+  public static TableDestinationCoderV2 of() {
+    return INSTANCE;
+  }
+
+  @Override
+  public void encode(TableDestination value, OutputStream outStream) throws IOException {
+    TableDestinationCoder.of().encode(value, outStream);
+    timePartitioningCoder.encode(value.getJsonTimePartitioning(), outStream);
+  }
+
+  @Override
+  public TableDestination decode(InputStream inStream) throws IOException {
+    TableDestination destination = TableDestinationCoder.of().decode(inStream);
+    String jsonTimePartitioning = timePartitioningCoder.decode(inStream);
+    return new TableDestination(
+        destination.getTableSpec(), destination.getTableDescription(), jsonTimePartitioning);
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {}
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index c8fab75..a646f17 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -23,6 +23,7 @@ import com.google.api.services.bigquery.model.JobConfigurationLoad;
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.api.services.bigquery.model.TimePartitioning;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -135,6 +136,7 @@ class WriteTables<DestinationT>
         bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
         jobIdPrefix,
         tableReference,
+        tableDestination.getTimePartitioning(),
         tableSchema,
         partitionFiles,
         writeDisposition,
@@ -150,6 +152,7 @@ class WriteTables<DestinationT>
       DatasetService datasetService,
       String jobIdPrefix,
       TableReference ref,
+      TimePartitioning timePartitioning,
       @Nullable TableSchema schema,
       List<String> gcsUris,
       WriteDisposition writeDisposition,
@@ -164,7 +167,9 @@ class WriteTables<DestinationT>
             .setWriteDisposition(writeDisposition.name())
             .setCreateDisposition(createDisposition.name())
             .setSourceFormat("NEWLINE_DELIMITED_JSON");
-
+    if (timePartitioning != null) {
+      loadConfig.setTimePartitioning(timePartitioning);
+    }
     String projectId = ref.getProjectId();
     Job lastFailedLoadJob = null;
     for (int i = 0; i < BatchLoads.MAX_RETRY_JOBS; ++i) {

http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 0ece3ee..18547cd 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -47,6 +47,7 @@ import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.api.services.bigquery.model.TimePartitioning;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.ImmutableList;
@@ -638,6 +639,55 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
+  public void testTimePartitioningStreamingInserts() throws Exception {
+    testTimePartitioning(Method.STREAMING_INSERTS);
+  }
+
+  @Test
+  public void testTimePartitioningBatchLoads() throws Exception {
+    testTimePartitioning(Method.FILE_LOADS);
+  }
+
+  public void testTimePartitioning(BigQueryIO.Write.Method insertMethod) throws Exception {
+    BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+    bqOptions.setProject("project-id");
+    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+
+    FakeDatasetService datasetService = new FakeDatasetService();
+    FakeBigQueryServices fakeBqServices =
+        new FakeBigQueryServices()
+            .withJobService(new FakeJobService())
+            .withDatasetService(datasetService);
+    datasetService.createDataset("project-id", "dataset-id", "", "");
+
+    Pipeline p = TestPipeline.create(bqOptions);
+    TableRow row1 = new TableRow().set("name", "a").set("number", "1");
+    TableRow row2 = new TableRow().set("name", "b").set("number", "2");
+
+    TimePartitioning timePartitioning = new TimePartitioning()
+        .setType("DAY")
+        .setExpirationMs(1000L);
+    TableSchema schema = new TableSchema()
+        .setFields(
+            ImmutableList.of(
+                new TableFieldSchema().setName("number").setType("INTEGER")));
+    p.apply(Create.of(row1, row1))
+        .apply(
+            BigQueryIO.writeTableRows()
+                .to("project-id:dataset-id.table-id")
+                .withTestServices(fakeBqServices)
+                .withMethod(insertMethod)
+                .withSchema(schema)
+                .withTimePartitioning(timePartitioning)
+                .withoutValidation());
+    p.run();
+    Table table = datasetService.getTable(
+        BigQueryHelpers.parseTableSpec("project-id:dataset-id.table-id"));
+    assertEquals(schema, table.getSchema());
+    assertEquals(timePartitioning, table.getTimePartitioning());
+  }
+
+  @Test
   @Category({ValidatesRunner.class, UsesTestStream.class})
   public void testTriggeredFileLoads() throws Exception {
     BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index 7d5101d..cc600d1 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -19,6 +19,7 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.api.client.json.JsonFactory;
 import com.google.api.client.util.BackOff;
@@ -39,6 +40,7 @@ import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.api.services.bigquery.model.TimePartitioning;
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -310,8 +312,13 @@ class FakeJobService implements JobService, Serializable {
     if (!validateDispositions(existingTable, createDisposition, writeDisposition)) {
       return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto());
     }
-
-    datasetService.createTable(new Table().setTableReference(destination).setSchema(schema));
+    if (existingTable == null) {
+      existingTable = new Table().setTableReference(destination).setSchema(schema);
+      if (load.getTimePartitioning() != null) {
+        existingTable = existingTable.setTimePartitioning(load.getTimePartitioning());
+      }
+      datasetService.createTable(existingTable);
+    }
 
     List<TableRow> rows = Lists.newArrayList();
     for (ResourceId filename : sourceFiles) {
@@ -331,13 +338,30 @@ class FakeJobService implements JobService, Serializable {
     if (!validateDispositions(existingTable, createDisposition, writeDisposition)) {
       return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto());
     }
-
+    TimePartitioning partitioning = null;
+    TableSchema schema = null;
+    boolean first = true;
     List<TableRow> allRows = Lists.newArrayList();
     for (TableReference source : sources) {
+      Table table = checkNotNull(datasetService.getTable(source));
+      if (!first) {
+        if (partitioning != table.getTimePartitioning()) {
+          return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto());
+        }
+        if (schema != table.getSchema()) {
+          return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto());
+        }
+      }
+      partitioning = table.getTimePartitioning();
+      schema = table.getSchema();
+      first = false;
       allRows.addAll(datasetService.getAllRows(
           source.getProjectId(), source.getDatasetId(), source.getTableId()));
     }
-    datasetService.createTable(new Table().setTableReference(destination));
+    datasetService.createTable(new Table()
+        .setTableReference(destination)
+        .setSchema(schema)
+        .setTimePartitioning(partitioning));
     datasetService.insertAll(destination, allRows, null);
     return new JobStatus().setState("DONE");
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java
index 8915069..e016c98 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java
@@ -32,6 +32,7 @@ class TableContainer {
   Long sizeBytes;
   TableContainer(Table table) {
     this.table = table;
+
     this.rows = new ArrayList<>();
     this.ids = new ArrayList<>();
     this.sizeBytes = 0L;
@@ -54,6 +55,7 @@ class TableContainer {
     return table;
   }
 
+
   List<TableRow> getRows() {
     return rows;
   }


[2/2] beam git commit: This closes #3663: [BEAM-2390] Add support for TimePartitioning in BigQueryIO

Posted by jk...@apache.org.
This closes #3663: [BEAM-2390] Add support for TimePartitioning in BigQueryIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6280d497
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6280d497
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6280d497

Branch: refs/heads/master
Commit: 6280d497b5264536a485993ccdf21034f2a1c3d8
Parents: 1c26b74 b0e03a3
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Aug 29 14:07:06 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Aug 29 14:07:06 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |  2 +-
 ...aultCoderCloudObjectTranslatorRegistrar.java |  2 +
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    |  4 +-
 .../sdk/io/gcp/bigquery/BigQueryHelpers.java    |  8 +++
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 47 ++++++++++++++++
 .../beam/sdk/io/gcp/bigquery/CreateTables.java  | 15 +++--
 .../bigquery/DynamicDestinationsHelpers.java    | 27 ++++++++-
 .../sdk/io/gcp/bigquery/TableDestination.java   | 39 ++++++++++++-
 .../io/gcp/bigquery/TableDestinationCoder.java  |  2 +
 .../gcp/bigquery/TableDestinationCoderV2.java   | 59 ++++++++++++++++++++
 .../beam/sdk/io/gcp/bigquery/WriteTables.java   |  7 ++-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 50 +++++++++++++++++
 .../sdk/io/gcp/bigquery/FakeJobService.java     | 32 +++++++++--
 .../sdk/io/gcp/bigquery/TableContainer.java     |  2 +
 14 files changed, 278 insertions(+), 18 deletions(-)
----------------------------------------------------------------------