You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2019/03/14 00:59:15 UTC

[beam] branch master updated: Report estimated size as source metadata when splitting.

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e4ec725  Report estimated size as source metadata when splitting.
     new 0fa4e20  Merge pull request #7922: [BEAM-6819] Report estimated size as source metadata when splitting
e4ec725 is described below

commit e4ec725a574b8d7f19537bfeb2c9926c1a81d80d
Author: Sunil Pedapudi <sk...@gmail.com>
AuthorDate: Thu Feb 21 20:26:55 2019 -0800

    Report estimated size as source metadata when splitting.
---
 .../dataflow/worker/WorkerCustomSources.java       | 14 ++++++--
 .../dataflow/worker/WorkerCustomSourcesTest.java   |  1 +
 .../java/org/apache/beam/sdk/io/AvroSource.java    | 30 +++++++++++++++++
 .../org/apache/beam/sdk/io/AvroSourceTest.java     | 19 +++++++++++
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       |  3 +-
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java    | 38 +++++++++++++++++++---
 6 files changed, 97 insertions(+), 8 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
index 71eab1b..3958497 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
@@ -32,6 +32,7 @@ import com.google.api.services.dataflow.model.ApproximateSplitRequest;
 import com.google.api.services.dataflow.model.DerivedSource;
 import com.google.api.services.dataflow.model.DynamicSourceSplit;
 import com.google.api.services.dataflow.model.ReportedParallelism;
+import com.google.api.services.dataflow.model.SourceMetadata;
 import com.google.api.services.dataflow.model.SourceOperationResponse;
 import com.google.api.services.dataflow.model.SourceSplitOptions;
 import com.google.api.services.dataflow.model.SourceSplitRequest;
@@ -55,6 +56,7 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.util.BackOff;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -133,9 +135,7 @@ public class WorkerCustomSources {
 
   /**
    * Version of {@link CustomSources#serializeToCloudSource(Source, PipelineOptions)} intended for
-   * use on splits of {@link BoundedSource}: the backend only needs the metadata for top-level
-   * sources, so here we bypass computing it (esp. the costly {@link
-   * BoundedSource#getEstimatedSizeBytes(PipelineOptions)}).
+   * use on splits of {@link BoundedSource}.
    */
   private static com.google.api.services.dataflow.model.Source serializeSplitToCloudSource(
       BoundedSource<?> source) throws Exception {
@@ -144,6 +144,14 @@ public class WorkerCustomSources {
     cloudSource.setSpec(CloudObject.forClass(CustomSources.class));
     addString(
         cloudSource.getSpec(), SERIALIZED_SOURCE, encodeBase64String(serializeToByteArray(source)));
+    SourceMetadata metadata = new SourceMetadata();
+    // Size estimation is best effort so we continue even if it fails here.
+    try {
+      metadata.setEstimatedSizeBytes(source.getEstimatedSizeBytes(PipelineOptionsFactory.create()));
+    } catch (Exception e) {
+      LOG.warn("Size estimation of the source failed: " + source, e);
+    }
+    cloudSource.setMetadata(metadata);
     return cloudSource;
   }
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index 73dfad9..4205e85 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
@@ -152,6 +152,7 @@ public class WorkerCustomSourcesTest {
           "Failed on bundle " + i,
           xs,
           contains(valueInGlobalWindow(0L + 2 * i), valueInGlobalWindow(1L + 2 * i)));
+      assertTrue(bundle.getSource().getMetadata().getEstimatedSizeBytes() > 0);
     }
   }
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 67faa35..1f31b02 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io;
 
+import static org.apache.beam.sdk.io.FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE;
 import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
 import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
@@ -215,6 +216,15 @@ public class AvroSource<T> extends BlockBasedSource<T> {
         readGenericRecordsWithSchema(null /* will need to be specified in withSchema */));
   }
 
+  public static AvroSource<GenericRecord> from(Metadata metadata) {
+    return new AvroSource<>(
+        metadata,
+        DEFAULT_MIN_BUNDLE_SIZE,
+        0,
+        metadata.sizeBytes(),
+        readGenericRecordsWithSchema(null /* will need to be specified in withSchema */));
+  }
+
   /** Like {@link #from(ValueProvider)}. */
   public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
     return from(ValueProvider.StaticValueProvider.of(fileNameOrPattern));
@@ -244,6 +254,14 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   /** Reads files containing records of the given class. */
   public <X> AvroSource<X> withSchema(Class<X> clazz) {
     checkArgument(clazz != null, "clazz can not be null");
+    if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
+      return new AvroSource<>(
+          getSingleFileMetadata(),
+          getMinBundleSize(),
+          getStartOffset(),
+          getEndOffset(),
+          readGeneratedClasses(clazz));
+    }
     return new AvroSource<>(
         getFileOrPatternSpecProvider(),
         getEmptyMatchTreatment(),
@@ -259,6 +277,14 @@ public class AvroSource<T> extends BlockBasedSource<T> {
       SerializableFunction<GenericRecord, X> parseFn, Coder<X> coder) {
     checkArgument(parseFn != null, "parseFn can not be null");
     checkArgument(coder != null, "coder can not be null");
+    if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
+      return new AvroSource<>(
+          getSingleFileMetadata(),
+          getMinBundleSize(),
+          getStartOffset(),
+          getEndOffset(),
+          parseGenericRecords(parseFn, coder));
+    }
     return new AvroSource<>(
         getFileOrPatternSpecProvider(),
         getEmptyMatchTreatment(),
@@ -271,6 +297,10 @@ public class AvroSource<T> extends BlockBasedSource<T> {
    * minBundleSize} and its use.
    */
   public AvroSource<T> withMinBundleSize(long minBundleSize) {
+    if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
+      return new AvroSource<>(
+          getSingleFileMetadata(), minBundleSize, getStartOffset(), getEndOffset(), mode);
+    }
     return new AvroSource<>(
         getFileOrPatternSpecProvider(), getEmptyMatchTreatment(), minBundleSize, mode);
   }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index 89ea347..4e0bce3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -756,6 +756,25 @@ public class AvroSourceTest {
     assertEquals(4, schema.getFields().size());
   }
 
+  @Test
+  public void testCreateFromMetadata() throws Exception {
+    List<Bird> expected = createRandomRecords(DEFAULT_RECORD_COUNT);
+    String codec = DataFileConstants.NULL_CODEC;
+    String filename =
+        generateTestFile(
+            codec, expected, SyncBehavior.SYNC_DEFAULT, 0, AvroCoder.of(Bird.class), codec);
+    Metadata fileMeta = FileSystems.matchSingleFileSpec(filename);
+
+    AvroSource<GenericRecord> source = AvroSource.from(fileMeta);
+    AvroSource<Bird> sourceWithSchema = source.withSchema(Bird.class);
+    AvroSource<Bird> sourceWithSchemaWithMinBundleSize = sourceWithSchema.withMinBundleSize(1234);
+
+    assertEquals(FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE, source.getMode());
+    assertEquals(FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE, sourceWithSchema.getMode());
+    assertEquals(
+        FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE, sourceWithSchemaWithMinBundleSize.getMode());
+  }
+
   /**
    * Class that will encode to a fixed size: 16 bytes.
    *
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index e694dfe..5d4e33a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -870,7 +870,8 @@ public class BigQueryIO {
                                         ImmutableList.of(
                                             FileSystems.matchNewResource(
                                                 c.element(), false /* is directory */)),
-                                        schema);
+                                        schema,
+                                        null);
                                 checkArgument(sources.size() == 1, "Expected exactly one source.");
                                 BoundedSource<T> avroSource = sources.get(0);
                                 BoundedSource.BoundedReader<T> reader =
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 12599fe..14c9bf3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import static org.apache.beam.sdk.io.FileSystems.match;
 import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
 import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId;
 import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
@@ -36,6 +37,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.AvroSource;
 import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
@@ -90,10 +92,17 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
   protected static class ExtractResult {
     public final TableSchema schema;
     public final List<ResourceId> extractedFiles;
+    public List<MatchResult.Metadata> metadata = null;
 
     public ExtractResult(TableSchema schema, List<ResourceId> extractedFiles) {
+      this(schema, extractedFiles, null);
+    }
+
+    public ExtractResult(
+        TableSchema schema, List<ResourceId> extractedFiles, List<MatchResult.Metadata> metadata) {
       this.schema = schema;
       this.extractedFiles = extractedFiles;
+      this.metadata = metadata;
     }
   }
 
@@ -138,8 +147,19 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
     if (cachedSplitResult == null) {
       ExtractResult res = extractFiles(options);
       LOG.info("Extract job produced {} files", res.extractedFiles.size());
+
+      if (res.extractedFiles.size() > 0) {
+        BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+        final String extractDestinationDir =
+            resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid);
+        // Match all files in the destination directory to stat them in bulk.
+        List<MatchResult> matches = match(ImmutableList.of(extractDestinationDir + "*"));
+        if (matches.size() > 0) {
+          res.metadata = matches.get(0).metadata();
+        }
+      }
       cleanupTempResource(options.as(BigQueryOptions.class));
-      cachedSplitResult = checkNotNull(createSources(res.extractedFiles, res.schema));
+      cachedSplitResult = checkNotNull(createSources(res.extractedFiles, res.schema, res.metadata));
     }
     return cachedSplitResult;
   }
@@ -206,7 +226,8 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
     }
   }
 
-  List<BoundedSource<T>> createSources(List<ResourceId> files, TableSchema schema)
+  List<BoundedSource<T>> createSources(
+      List<ResourceId> files, TableSchema schema, List<MatchResult.Metadata> metadata)
       throws IOException, InterruptedException {
 
     final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(schema);
@@ -221,9 +242,18 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
             return parseFn.apply(new SchemaAndRecord(input, schema.get()));
           }
         };
+
     List<BoundedSource<T>> avroSources = Lists.newArrayList();
-    for (ResourceId file : files) {
-      avroSources.add(AvroSource.from(file.toString()).withParseFn(fnWrapper, getOutputCoder()));
+    // If metadata is available, create AvroSources with said metadata in SINGLE_FILE_OR_SUBRANGE
+    // mode.
+    if (metadata != null) {
+      for (MatchResult.Metadata file : metadata) {
+        avroSources.add(AvroSource.from(file).withParseFn(fnWrapper, getOutputCoder()));
+      }
+    } else {
+      for (ResourceId file : files) {
+        avroSources.add(AvroSource.from(file.toString()).withParseFn(fnWrapper, getOutputCoder()));
+      }
     }
     return ImmutableList.copyOf(avroSources);
   }