You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2019/03/14 00:59:15 UTC
[beam] branch master updated: Report estimated size as source
metadata when splitting.
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e4ec725 Report estimated size as source metadata when splitting.
new 0fa4e20 Merge pull request #7922: [BEAM-6819] Report estimated size as source metadata when splitting
e4ec725 is described below
commit e4ec725a574b8d7f19537bfeb2c9926c1a81d80d
Author: Sunil Pedapudi <sk...@gmail.com>
AuthorDate: Thu Feb 21 20:26:55 2019 -0800
Report estimated size as source metadata when splitting.
---
.../dataflow/worker/WorkerCustomSources.java | 14 ++++++--
.../dataflow/worker/WorkerCustomSourcesTest.java | 1 +
.../java/org/apache/beam/sdk/io/AvroSource.java | 30 +++++++++++++++++
.../org/apache/beam/sdk/io/AvroSourceTest.java | 19 +++++++++++
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 3 +-
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 38 +++++++++++++++++++---
6 files changed, 97 insertions(+), 8 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
index 71eab1b..3958497 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
@@ -32,6 +32,7 @@ import com.google.api.services.dataflow.model.ApproximateSplitRequest;
import com.google.api.services.dataflow.model.DerivedSource;
import com.google.api.services.dataflow.model.DynamicSourceSplit;
import com.google.api.services.dataflow.model.ReportedParallelism;
+import com.google.api.services.dataflow.model.SourceMetadata;
import com.google.api.services.dataflow.model.SourceOperationResponse;
import com.google.api.services.dataflow.model.SourceSplitOptions;
import com.google.api.services.dataflow.model.SourceSplitRequest;
@@ -55,6 +56,7 @@ import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.WindowedValue;
@@ -133,9 +135,7 @@ public class WorkerCustomSources {
/**
* Version of {@link CustomSources#serializeToCloudSource(Source, PipelineOptions)} intended for
- * use on splits of {@link BoundedSource}: the backend only needs the metadata for top-level
- * sources, so here we bypass computing it (esp. the costly {@link
- * BoundedSource#getEstimatedSizeBytes(PipelineOptions)}).
+ * use on splits of {@link BoundedSource}.
*/
private static com.google.api.services.dataflow.model.Source serializeSplitToCloudSource(
BoundedSource<?> source) throws Exception {
@@ -144,6 +144,14 @@ public class WorkerCustomSources {
cloudSource.setSpec(CloudObject.forClass(CustomSources.class));
addString(
cloudSource.getSpec(), SERIALIZED_SOURCE, encodeBase64String(serializeToByteArray(source)));
+ SourceMetadata metadata = new SourceMetadata();
+ // Size estimation is best effort so we continue even if it fails here.
+ try {
+ metadata.setEstimatedSizeBytes(source.getEstimatedSizeBytes(PipelineOptionsFactory.create()));
+ } catch (Exception e) {
+ LOG.warn("Size estimation of the source failed: " + source, e);
+ }
+ cloudSource.setMetadata(metadata);
return cloudSource;
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index 73dfad9..4205e85 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
@@ -152,6 +152,7 @@ public class WorkerCustomSourcesTest {
"Failed on bundle " + i,
xs,
contains(valueInGlobalWindow(0L + 2 * i), valueInGlobalWindow(1L + 2 * i)));
+ assertTrue(bundle.getSource().getMetadata().getEstimatedSizeBytes() > 0);
}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 67faa35..1f31b02 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io;
+import static org.apache.beam.sdk.io.FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
@@ -215,6 +216,15 @@ public class AvroSource<T> extends BlockBasedSource<T> {
readGenericRecordsWithSchema(null /* will need to be specified in withSchema */));
}
+ public static AvroSource<GenericRecord> from(Metadata metadata) {
+ return new AvroSource<>(
+ metadata,
+ DEFAULT_MIN_BUNDLE_SIZE,
+ 0,
+ metadata.sizeBytes(),
+ readGenericRecordsWithSchema(null /* will need to be specified in withSchema */));
+ }
+
/** Like {@link #from(ValueProvider)}. */
public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
return from(ValueProvider.StaticValueProvider.of(fileNameOrPattern));
@@ -244,6 +254,14 @@ public class AvroSource<T> extends BlockBasedSource<T> {
/** Reads files containing records of the given class. */
public <X> AvroSource<X> withSchema(Class<X> clazz) {
checkArgument(clazz != null, "clazz can not be null");
+ if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
+ return new AvroSource<>(
+ getSingleFileMetadata(),
+ getMinBundleSize(),
+ getStartOffset(),
+ getEndOffset(),
+ readGeneratedClasses(clazz));
+ }
return new AvroSource<>(
getFileOrPatternSpecProvider(),
getEmptyMatchTreatment(),
@@ -259,6 +277,14 @@ public class AvroSource<T> extends BlockBasedSource<T> {
SerializableFunction<GenericRecord, X> parseFn, Coder<X> coder) {
checkArgument(parseFn != null, "parseFn can not be null");
checkArgument(coder != null, "coder can not be null");
+ if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
+ return new AvroSource<>(
+ getSingleFileMetadata(),
+ getMinBundleSize(),
+ getStartOffset(),
+ getEndOffset(),
+ parseGenericRecords(parseFn, coder));
+ }
return new AvroSource<>(
getFileOrPatternSpecProvider(),
getEmptyMatchTreatment(),
@@ -271,6 +297,10 @@ public class AvroSource<T> extends BlockBasedSource<T> {
* minBundleSize} and its use.
*/
public AvroSource<T> withMinBundleSize(long minBundleSize) {
+ if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
+ return new AvroSource<>(
+ getSingleFileMetadata(), minBundleSize, getStartOffset(), getEndOffset(), mode);
+ }
return new AvroSource<>(
getFileOrPatternSpecProvider(), getEmptyMatchTreatment(), minBundleSize, mode);
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index 89ea347..4e0bce3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -756,6 +756,25 @@ public class AvroSourceTest {
assertEquals(4, schema.getFields().size());
}
+ @Test
+ public void testCreateFromMetadata() throws Exception {
+ List<Bird> expected = createRandomRecords(DEFAULT_RECORD_COUNT);
+ String codec = DataFileConstants.NULL_CODEC;
+ String filename =
+ generateTestFile(
+ codec, expected, SyncBehavior.SYNC_DEFAULT, 0, AvroCoder.of(Bird.class), codec);
+ Metadata fileMeta = FileSystems.matchSingleFileSpec(filename);
+
+ AvroSource<GenericRecord> source = AvroSource.from(fileMeta);
+ AvroSource<Bird> sourceWithSchema = source.withSchema(Bird.class);
+ AvroSource<Bird> sourceWithSchemaWithMinBundleSize = sourceWithSchema.withMinBundleSize(1234);
+
+ assertEquals(FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE, source.getMode());
+ assertEquals(FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE, sourceWithSchema.getMode());
+ assertEquals(
+ FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE, sourceWithSchemaWithMinBundleSize.getMode());
+ }
+
/**
* Class that will encode to a fixed size: 16 bytes.
*
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index e694dfe..5d4e33a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -870,7 +870,8 @@ public class BigQueryIO {
ImmutableList.of(
FileSystems.matchNewResource(
c.element(), false /* is directory */)),
- schema);
+ schema,
+ null);
checkArgument(sources.size() == 1, "Expected exactly one source.");
BoundedSource<T> avroSource = sources.get(0);
BoundedSource.BoundedReader<T> reader =
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 12599fe..14c9bf3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
+import static org.apache.beam.sdk.io.FileSystems.match;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
@@ -36,6 +37,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
@@ -90,10 +92,17 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
protected static class ExtractResult {
public final TableSchema schema;
public final List<ResourceId> extractedFiles;
+ public List<MatchResult.Metadata> metadata = null;
public ExtractResult(TableSchema schema, List<ResourceId> extractedFiles) {
+ this(schema, extractedFiles, null);
+ }
+
+ public ExtractResult(
+ TableSchema schema, List<ResourceId> extractedFiles, List<MatchResult.Metadata> metadata) {
this.schema = schema;
this.extractedFiles = extractedFiles;
+ this.metadata = metadata;
}
}
@@ -138,8 +147,19 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
if (cachedSplitResult == null) {
ExtractResult res = extractFiles(options);
LOG.info("Extract job produced {} files", res.extractedFiles.size());
+
+ if (res.extractedFiles.size() > 0) {
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ final String extractDestinationDir =
+ resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid);
+ // Match all files in the destination directory to stat them in bulk.
+ List<MatchResult> matches = match(ImmutableList.of(extractDestinationDir + "*"));
+ if (matches.size() > 0) {
+ res.metadata = matches.get(0).metadata();
+ }
+ }
cleanupTempResource(options.as(BigQueryOptions.class));
- cachedSplitResult = checkNotNull(createSources(res.extractedFiles, res.schema));
+ cachedSplitResult = checkNotNull(createSources(res.extractedFiles, res.schema, res.metadata));
}
return cachedSplitResult;
}
@@ -206,7 +226,8 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
}
}
- List<BoundedSource<T>> createSources(List<ResourceId> files, TableSchema schema)
+ List<BoundedSource<T>> createSources(
+ List<ResourceId> files, TableSchema schema, List<MatchResult.Metadata> metadata)
throws IOException, InterruptedException {
final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(schema);
@@ -221,9 +242,18 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
return parseFn.apply(new SchemaAndRecord(input, schema.get()));
}
};
+
List<BoundedSource<T>> avroSources = Lists.newArrayList();
- for (ResourceId file : files) {
- avroSources.add(AvroSource.from(file.toString()).withParseFn(fnWrapper, getOutputCoder()));
+ // If metadata is available, create AvroSources with said metadata in SINGLE_FILE_OR_SUBRANGE
+ // mode.
+ if (metadata != null) {
+ for (MatchResult.Metadata file : metadata) {
+ avroSources.add(AvroSource.from(file).withParseFn(fnWrapper, getOutputCoder()));
+ }
+ } else {
+ for (ResourceId file : files) {
+ avroSources.add(AvroSource.from(file.toString()).withParseFn(fnWrapper, getOutputCoder()));
+ }
}
return ImmutableList.copyOf(avroSources);
}