You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/26 00:51:50 UTC
[1/4] beam git commit: Introduces AvroIO.readAll() and
readAllGenericRecords()
Repository: beam
Updated Branches:
refs/heads/master 71196ec9c -> d919394c7
Introduces AvroIO.readAll() and readAllGenericRecords()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee1bcbae
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee1bcbae
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee1bcbae
Branch: refs/heads/master
Commit: ee1bcbae08fb221e392175fbd0387594653d4a86
Parents: eaf0b36
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jul 21 14:09:35 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jul 25 17:36:49 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 132 +++++++++++++++++--
.../java/org/apache/beam/sdk/io/AvroUtils.java | 40 ++++++
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 65 ++++++++-
3 files changed, 223 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ee1bcbae/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index d308c85..f201114 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -21,6 +21,8 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.io.BaseEncoding;
@@ -54,14 +56,17 @@ import org.apache.beam.sdk.values.PDone;
* {@link PTransform}s for reading and writing Avro files.
*
* <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()}, using
- * {@link AvroIO.Read#from} to specify the filename or filepattern to read from. See {@link
- * FileSystems} for information on supported file systems and filepatterns.
+ * {@link AvroIO.Read#from} to specify the filename or filepattern to read from. Alternatively, if
+ * the filepatterns to be read are themselves in a {@link PCollection}, apply {@link #readAll}.
+ *
+ * <p>See {@link FileSystems} for information on supported file systems and filepatterns.
*
* <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read
* {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes a
* {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a
* JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
- * schema.
+ * schema. Likewise, to read a {@link PCollection} of filepatterns, apply {@link
+ * #readAllGenericRecords}.
*
* <p>For example:
*
@@ -79,6 +84,18 @@ import org.apache.beam.sdk.values.PDone;
* .from("gs://my_bucket/path/to/records-*.avro"));
* }</pre>
*
+ * <p>Reading from a {@link PCollection} of filepatterns:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<String> filepatterns = p.apply(...);
+ * PCollection<AvroAutoGenClass> records =
+ * filepatterns.apply(AvroIO.read(AvroAutoGenClass.class));
+ * PCollection<GenericRecord> genericRecords =
+ * filepatterns.apply(AvroIO.readGenericRecords(schema));
+ * }</pre>
+ *
* <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, using
* {@code AvroIO.write().to(String)} to specify the output filename prefix. The default {@link
* DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link ShardNameTemplate} (set
@@ -133,6 +150,18 @@ public class AvroIO {
.build();
}
+ /** Like {@link #read}, but reads each filepattern in the input {@link PCollection}. */
+ public static <T> ReadAll<T> readAll(Class<T> recordClass) {
+ return new AutoValue_AvroIO_ReadAll.Builder<T>()
+ .setRecordClass(recordClass)
+ .setSchema(ReflectData.get().getSchema(recordClass))
+ // 64MB is a reasonable value that allows to amortize the cost of opening files,
+ // but is not so large as to exhaust a typical runner's maximum amount of output per
+ // ProcessElement call.
+ .setDesiredBundleSizeBytes(64 * 1024 * 1024L)
+ .build();
+ }
+
/** Reads Avro file(s) containing records of the specified schema. */
public static Read<GenericRecord> readGenericRecords(Schema schema) {
return new AutoValue_AvroIO_Read.Builder<GenericRecord>()
@@ -142,6 +171,17 @@ public class AvroIO {
}
/**
+ * Like {@link #readGenericRecords(Schema)}, but reads each filepattern in the input {@link
+ * PCollection}.
+ */
+ public static ReadAll<GenericRecord> readAllGenericRecords(Schema schema) {
+ return new AutoValue_AvroIO_ReadAll.Builder<GenericRecord>()
+ .setRecordClass(GenericRecord.class)
+ .setSchema(schema)
+ .build();
+ }
+
+ /**
* Reads Avro file(s) containing records of the specified schema. The schema is specified as a
* JSON-encoded string.
*/
@@ -150,6 +190,14 @@ public class AvroIO {
}
/**
+ * Like {@link #readGenericRecords(String)}, but reads each filepattern in the input {@link
+ * PCollection}.
+ */
+ public static ReadAll<GenericRecord> readAllGenericRecords(String schema) {
+ return readAllGenericRecords(new Schema.Parser().parse(schema));
+ }
+
+ /**
* Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding
* pattern).
*/
@@ -217,14 +265,12 @@ public class AvroIO {
public PCollection<T> expand(PBegin input) {
checkNotNull(getFilepattern(), "filepattern");
checkNotNull(getSchema(), "schema");
-
- @SuppressWarnings("unchecked")
- AvroSource<T> source =
- getRecordClass() == GenericRecord.class
- ? (AvroSource<T>) AvroSource.from(getFilepattern()).withSchema(getSchema())
- : AvroSource.from(getFilepattern()).withSchema(getRecordClass());
-
- return input.getPipeline().apply("Read", org.apache.beam.sdk.io.Read.from(source));
+ return input
+ .getPipeline()
+ .apply(
+ "Read",
+ org.apache.beam.sdk.io.Read.from(
+ createSource(getFilepattern(), getRecordClass(), getSchema())));
}
@Override
@@ -233,6 +279,70 @@ public class AvroIO {
builder.addIfNotNull(
DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"));
}
+
+ @SuppressWarnings("unchecked")
+ private static <T> AvroSource<T> createSource(
+ ValueProvider<String> filepattern, Class<T> recordClass, Schema schema) {
+ return recordClass == GenericRecord.class
+ ? (AvroSource<T>) AvroSource.from(filepattern).withSchema(schema)
+ : AvroSource.from(filepattern).withSchema(recordClass);
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ /** Implementation of {@link #readAll}. */
+ @AutoValue
+ public abstract static class ReadAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
+ @Nullable abstract Class<T> getRecordClass();
+ @Nullable abstract Schema getSchema();
+ abstract long getDesiredBundleSizeBytes();
+
+ abstract Builder<T> toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> setRecordClass(Class<T> recordClass);
+ abstract Builder<T> setSchema(Schema schema);
+ abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
+
+ abstract ReadAll<T> build();
+ }
+
+ @VisibleForTesting
+ ReadAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+ return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+ }
+
+ @Override
+ public PCollection<T> expand(PCollection<String> input) {
+ checkNotNull(getSchema(), "schema");
+ return input
+ .apply(
+ "Read all via FileBasedSource",
+ new ReadAllViaFileBasedSource<>(
+ SerializableFunctions.<String, Boolean>constant(true) /* isSplittable */,
+ getDesiredBundleSizeBytes(),
+ new CreateSourceFn<>(getRecordClass(), getSchema().toString())))
+ .setCoder(AvroCoder.of(getRecordClass(), getSchema()));
+ }
+ }
+
+ private static class CreateSourceFn<T>
+ implements SerializableFunction<String, FileBasedSource<T>> {
+ private final Class<T> recordClass;
+ private final Supplier<Schema> schemaSupplier;
+
+ public CreateSourceFn(Class<T> recordClass, String jsonSchema) {
+ this.recordClass = recordClass;
+ this.schemaSupplier = AvroUtils.serializableSchemaSupplier(jsonSchema);
+ }
+
+ @Override
+ public FileBasedSource<T> apply(String input) {
+ return Read.createSource(
+ StaticValueProvider.of(input), recordClass, schemaSupplier.get());
+ }
}
/////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/beam/blob/ee1bcbae/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroUtils.java
new file mode 100644
index 0000000..65c5bf1
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import java.io.Serializable;
+import org.apache.avro.Schema;
+
+/** Helpers for working with Avro. */
+class AvroUtils {
+ /** Helper to get around the fact that {@link Schema} itself is not serializable. */
+ public static Supplier<Schema> serializableSchemaSupplier(String jsonSchema) {
+ return Suppliers.memoize(
+ Suppliers.compose(new JsonToSchema(), Suppliers.ofInstance(jsonSchema)));
+ }
+
+ private static class JsonToSchema implements Function<String, Schema>, Serializable {
+ @Override
+ public Schema apply(String input) {
+ return new Schema.Parser().parse(input);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ee1bcbae/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 4380c57..df5d26c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -30,6 +30,7 @@ import static org.junit.Assert.assertTrue;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import java.io.File;
@@ -152,10 +153,68 @@ public class AvroIOTest {
.apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
writePipeline.run().waitUntilFinish();
- PCollection<GenericClass> input =
- readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
+ // Test both read() and readAll()
+ PAssert.that(
+ readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
+ .containsInAnyOrder(values);
+ PAssert.that(
+ readPipeline
+ .apply(Create.of(outputFile.getAbsolutePath()))
+ .apply(AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+ .containsInAnyOrder(values);
+
+ readPipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testAvroIOWriteAndReadMultipleFilepatterns() throws Throwable {
+ List<GenericClass> firstValues = Lists.newArrayList();
+ List<GenericClass> secondValues = Lists.newArrayList();
+ for (int i = 0; i < 10; ++i) {
+ firstValues.add(new GenericClass(i, "a" + i));
+ secondValues.add(new GenericClass(i, "b" + i));
+ }
+ writePipeline
+ .apply("Create first", Create.of(firstValues))
+ .apply(
+ "Write first",
+ AvroIO.write(GenericClass.class)
+ .to(tmpFolder.getRoot().getAbsolutePath() + "/first")
+ .withNumShards(2));
+ writePipeline
+ .apply("Create second", Create.of(secondValues))
+ .apply(
+ "Write second",
+ AvroIO.write(GenericClass.class)
+ .to(tmpFolder.getRoot().getAbsolutePath() + "/second")
+ .withNumShards(3));
+ writePipeline.run().waitUntilFinish();
+
+ // Test both read() and readAll()
+ PAssert.that(
+ readPipeline.apply(
+ "Read first",
+ AvroIO.read(GenericClass.class)
+ .from(tmpFolder.getRoot().getAbsolutePath() + "/first*")))
+ .containsInAnyOrder(firstValues);
+ PAssert.that(
+ readPipeline.apply(
+ "Read second",
+ AvroIO.read(GenericClass.class)
+ .from(tmpFolder.getRoot().getAbsolutePath() + "/second*")))
+ .containsInAnyOrder(secondValues);
+ PAssert.that(
+ readPipeline
+ .apply(
+ "Create paths",
+ Create.of(
+ tmpFolder.getRoot().getAbsolutePath() + "/first*",
+ tmpFolder.getRoot().getAbsolutePath() + "/second*"))
+ .apply(
+ "Read all", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+ .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
- PAssert.that(input).containsInAnyOrder(values);
readPipeline.run();
}
[2/4] beam git commit: Extracts common logic from TextIO.ReadAll into
a utility transform
Posted by jk...@apache.org.
Extracts common logic from TextIO.ReadAll into a utility transform
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eaf0b363
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eaf0b363
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eaf0b363
Branch: refs/heads/master
Commit: eaf0b36313fcd59963b2efbf16f50dd913da7de2
Parents: e80c83b
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jul 21 14:09:13 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jul 25 17:36:49 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/ReadAllViaFileBasedSource.java | 152 +++++++++++++++++++
.../java/org/apache/beam/sdk/io/TextIO.java | 135 ++++------------
2 files changed, 179 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/eaf0b363/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
new file mode 100644
index 0000000..66aa41e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Reads each filepattern in the input {@link PCollection} using given parameters for splitting
+ * files into offset ranges and for creating a {@link FileBasedSource} for a file.
+ */
+class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, PCollection<T>> {
+ private final SerializableFunction<String, Boolean> isSplittable;
+ private final long desiredBundleSizeBytes;
+ private final SerializableFunction<String, FileBasedSource<T>> createSource;
+
+ public ReadAllViaFileBasedSource(
+ SerializableFunction<String, Boolean> isSplittable,
+ long desiredBundleSizeBytes,
+ SerializableFunction<String, FileBasedSource<T>> createSource) {
+ this.isSplittable = isSplittable;
+ this.desiredBundleSizeBytes = desiredBundleSizeBytes;
+ this.createSource = createSource;
+ }
+
+ @Override
+ public PCollection<T> expand(PCollection<String> input) {
+ return input
+ .apply("Expand glob", ParDo.of(new ExpandGlobFn()))
+ .apply(
+ "Split into ranges",
+ ParDo.of(new SplitIntoRangesFn(isSplittable, desiredBundleSizeBytes)))
+ .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<MatchResult.Metadata, OffsetRange>>())
+ .apply("Read ranges", ParDo.of(new ReadFileRangesFn<T>(createSource)));
+ }
+
+ private static class ReshuffleWithUniqueKey<T>
+ extends PTransform<PCollection<T>, PCollection<T>> {
+ @Override
+ public PCollection<T> expand(PCollection<T> input) {
+ return input
+ .apply("Unique key", ParDo.of(new AssignUniqueKeyFn<T>()))
+ .apply("Reshuffle", Reshuffle.<Integer, T>of())
+ .apply("Values", Values.<T>create());
+ }
+ }
+
+ private static class AssignUniqueKeyFn<T> extends DoFn<T, KV<Integer, T>> {
+ private int index;
+
+ @Setup
+ public void setup() {
+ this.index = ThreadLocalRandom.current().nextInt();
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ c.output(KV.of(++index, c.element()));
+ }
+ }
+
+ private static class ExpandGlobFn extends DoFn<String, MatchResult.Metadata> {
+ @ProcessElement
+ public void process(ProcessContext c) throws Exception {
+ MatchResult match = FileSystems.match(c.element());
+ checkArgument(
+ match.status().equals(MatchResult.Status.OK),
+ "Failed to match filepattern %s: %s",
+ c.element(),
+ match.status());
+ for (MatchResult.Metadata metadata : match.metadata()) {
+ c.output(metadata);
+ }
+ }
+ }
+
+ private static class SplitIntoRangesFn
+ extends DoFn<MatchResult.Metadata, KV<MatchResult.Metadata, OffsetRange>> {
+ private final SerializableFunction<String, Boolean> isSplittable;
+ private final long desiredBundleSizeBytes;
+
+ private SplitIntoRangesFn(
+ SerializableFunction<String, Boolean> isSplittable, long desiredBundleSizeBytes) {
+ this.isSplittable = isSplittable;
+ this.desiredBundleSizeBytes = desiredBundleSizeBytes;
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ MatchResult.Metadata metadata = c.element();
+ if (!metadata.isReadSeekEfficient()
+ || !isSplittable.apply(metadata.resourceId().toString())) {
+ c.output(KV.of(metadata, new OffsetRange(0, metadata.sizeBytes())));
+ return;
+ }
+ for (OffsetRange range :
+ new OffsetRange(0, metadata.sizeBytes()).split(desiredBundleSizeBytes, 0)) {
+ c.output(KV.of(metadata, range));
+ }
+ }
+ }
+
+ private static class ReadFileRangesFn<T> extends DoFn<KV<MatchResult.Metadata, OffsetRange>, T> {
+ private final SerializableFunction<String, FileBasedSource<T>> createSource;
+
+ private ReadFileRangesFn(SerializableFunction<String, FileBasedSource<T>> createSource) {
+ this.createSource = createSource;
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c) throws IOException {
+ MatchResult.Metadata metadata = c.element().getKey();
+ OffsetRange range = c.element().getValue();
+ FileBasedSource<T> source = createSource.apply(metadata.toString());
+ try (BoundedSource.BoundedReader<T> reader =
+ source
+ .createForSubrangeOfFile(metadata, range.getFrom(), range.getTo())
+ .createReader(c.getPipelineOptions())) {
+ for (boolean more = reader.start(); more; more = reader.advance()) {
+ c.output(reader.getCurrent());
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/eaf0b363/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 7b4c483..73040da 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -23,8 +23,6 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -37,23 +35,14 @@ import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
import org.apache.beam.sdk.io.Read.Bounded;
-import org.apache.beam.sdk.io.fs.MatchResult;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.io.fs.MatchResult.Status;
import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
-import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
@@ -356,120 +345,50 @@ public class TextIO {
@Override
public PCollection<String> expand(PCollection<String> input) {
return input
- .apply("Expand glob", ParDo.of(new ExpandGlobFn()))
.apply(
- "Split into ranges",
- ParDo.of(new SplitIntoRangesFn(getCompressionType(), getDesiredBundleSizeBytes())))
- .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<Metadata, OffsetRange>>())
- .apply("Read", ParDo.of(new ReadTextFn(this)));
+ "Read all via FileBasedSource",
+ new ReadAllViaFileBasedSource<>(
+ new IsSplittableFn(getCompressionType()),
+ getDesiredBundleSizeBytes(),
+ new CreateTextSourceFn(getCompressionType())))
+ .setCoder(StringUtf8Coder.of());
}
- private static class ReshuffleWithUniqueKey<T>
- extends PTransform<PCollection<T>, PCollection<T>> {
- @Override
- public PCollection<T> expand(PCollection<T> input) {
- return input
- .apply("Unique key", ParDo.of(new AssignUniqueKeyFn<T>()))
- .apply("Reshuffle", Reshuffle.<Integer, T>of())
- .apply("Values", Values.<T>create());
- }
- }
-
- private static class AssignUniqueKeyFn<T> extends DoFn<T, KV<Integer, T>> {
- private int index;
-
- @Setup
- public void setup() {
- this.index = ThreadLocalRandom.current().nextInt();
- }
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
- @ProcessElement
- public void process(ProcessContext c) {
- c.output(KV.of(++index, c.element()));
- }
+ builder.add(
+ DisplayData.item("compressionType", getCompressionType().toString())
+ .withLabel("Compression Type"));
}
- private static class ExpandGlobFn extends DoFn<String, Metadata> {
- @ProcessElement
- public void process(ProcessContext c) throws Exception {
- MatchResult match = FileSystems.match(c.element());
- checkArgument(
- match.status().equals(Status.OK),
- "Failed to match filepattern %s: %s",
- c.element(),
- match.status());
- for (Metadata metadata : match.metadata()) {
- c.output(metadata);
- }
- }
- }
-
- private static class SplitIntoRangesFn extends DoFn<Metadata, KV<Metadata, OffsetRange>> {
+ private static class CreateTextSourceFn
+ implements SerializableFunction<String, FileBasedSource<String>> {
private final CompressionType compressionType;
- private final long desiredBundleSize;
- private SplitIntoRangesFn(CompressionType compressionType, long desiredBundleSize) {
+ private CreateTextSourceFn(CompressionType compressionType) {
this.compressionType = compressionType;
- this.desiredBundleSize = desiredBundleSize;
- }
-
- @ProcessElement
- public void process(ProcessContext c) {
- Metadata metadata = c.element();
- final boolean isSplittable = isSplittable(metadata, compressionType);
- if (!isSplittable) {
- c.output(KV.of(metadata, new OffsetRange(0, metadata.sizeBytes())));
- return;
- }
- for (OffsetRange range :
- new OffsetRange(0, metadata.sizeBytes()).split(desiredBundleSize, 0)) {
- c.output(KV.of(metadata, range));
- }
}
- static boolean isSplittable(Metadata metadata, CompressionType compressionType) {
- if (!metadata.isReadSeekEfficient()) {
- return false;
- }
- switch (compressionType) {
- case AUTO:
- return !CompressionMode.isCompressed(metadata.resourceId().toString());
- case UNCOMPRESSED:
- return true;
- case GZIP:
- case BZIP2:
- case ZIP:
- case DEFLATE:
- return false;
- default:
- throw new UnsupportedOperationException("Unknown compression type: " + compressionType);
- }
+ @Override
+ public FileBasedSource<String> apply(String input) {
+ return Read.wrapWithCompression(
+ new TextSource(StaticValueProvider.of(input)), compressionType);
}
}
- private static class ReadTextFn extends DoFn<KV<Metadata, OffsetRange>, String> {
- private final TextIO.ReadAll spec;
+ private static class IsSplittableFn implements SerializableFunction<String, Boolean> {
+ private final CompressionType compressionType;
- private ReadTextFn(ReadAll spec) {
- this.spec = spec;
+ private IsSplittableFn(CompressionType compressionType) {
+ this.compressionType = compressionType;
}
- @ProcessElement
- public void process(ProcessContext c) throws IOException {
- Metadata metadata = c.element().getKey();
- OffsetRange range = c.element().getValue();
- FileBasedSource<String> source =
- TextIO.Read.wrapWithCompression(
- new TextSource(StaticValueProvider.of(metadata.toString())),
- spec.getCompressionType());
- try (BoundedSource.BoundedReader<String> reader =
- source
- .createForSubrangeOfFile(metadata, range.getFrom(), range.getTo())
- .createReader(c.getPipelineOptions())) {
- for (boolean more = reader.start(); more; more = reader.advance()) {
- c.output(reader.getCurrent());
- }
- }
+ @Override
+ public Boolean apply(String filename) {
+ return compressionType == CompressionType.UNCOMPRESSED
+ || (compressionType == CompressionType.AUTO && !CompressionMode.isCompressed(filename));
}
}
}
[3/4] beam git commit: Adds ValueProvider support to AvroIO.Read
Posted by jk...@apache.org.
Adds ValueProvider support to AvroIO.Read
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e80c83b2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e80c83b2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e80c83b2
Branch: refs/heads/master
Commit: e80c83b2a0a1cf55aa8a452a02a76c9dc13697cc
Parents: 71196ec
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jul 21 12:38:17 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jul 25 17:36:49 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 49 ++++++++------------
.../java/org/apache/beam/sdk/io/AvroSource.java | 24 +++++++---
.../apache/beam/sdk/io/BlockBasedSource.java | 6 +++
3 files changed, 42 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e80c83b2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 89cadbd..d308c85 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableMap;
@@ -36,7 +37,6 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
-import org.apache.beam.sdk.io.Read.Bounded;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -185,10 +185,10 @@ public class AvroIO {
.setWindowedWrites(false);
}
- /** Implementation of {@link #read}. */
+ /** Implementation of {@link #read} and {@link #readGenericRecords}. */
@AutoValue
public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
- @Nullable abstract String getFilepattern();
+ @Nullable abstract ValueProvider<String> getFilepattern();
@Nullable abstract Class<T> getRecordClass();
@Nullable abstract Schema getSchema();
@@ -196,7 +196,7 @@ public class AvroIO {
@AutoValue.Builder
abstract static class Builder<T> {
- abstract Builder<T> setFilepattern(String filepattern);
+ abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
abstract Builder<T> setRecordClass(Class<T> recordClass);
abstract Builder<T> setSchema(Schema schema);
@@ -204,45 +204,34 @@ public class AvroIO {
}
/** Reads from the given filename or filepattern. */
- public Read<T> from(String filepattern) {
+ public Read<T> from(ValueProvider<String> filepattern) {
return toBuilder().setFilepattern(filepattern).build();
}
+ /** Like {@link #from(ValueProvider)}. */
+ public Read<T> from(String filepattern) {
+ return from(StaticValueProvider.of(filepattern));
+ }
+
@Override
public PCollection<T> expand(PBegin input) {
- if (getFilepattern() == null) {
- throw new IllegalStateException(
- "need to set the filepattern of an AvroIO.Read transform");
- }
- if (getSchema() == null) {
- throw new IllegalStateException("need to set the schema of an AvroIO.Read transform");
- }
+ checkNotNull(getFilepattern(), "filepattern");
+ checkNotNull(getSchema(), "schema");
@SuppressWarnings("unchecked")
- Bounded<T> read =
+ AvroSource<T> source =
getRecordClass() == GenericRecord.class
- ? (Bounded<T>) org.apache.beam.sdk.io.Read.from(
- AvroSource.from(getFilepattern()).withSchema(getSchema()))
- : org.apache.beam.sdk.io.Read.from(
- AvroSource.from(getFilepattern()).withSchema(getRecordClass()));
-
- PCollection<T> pcol = input.getPipeline().apply("Read", read);
- // Honor the default output coder that would have been used by this PTransform.
- pcol.setCoder(getDefaultOutputCoder());
- return pcol;
+ ? (AvroSource<T>) AvroSource.from(getFilepattern()).withSchema(getSchema())
+ : AvroSource.from(getFilepattern()).withSchema(getRecordClass());
+
+ return input.getPipeline().apply("Read", org.apache.beam.sdk.io.Read.from(source));
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- builder
- .addIfNotNull(DisplayData.item("filePattern", getFilepattern())
- .withLabel("Input File Pattern"));
- }
-
- @Override
- protected Coder<T> getDefaultOutputCoder() {
- return AvroCoder.of(getRecordClass(), getSchema());
+ builder.addIfNotNull(
+ DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e80c83b2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 7cd97a8..a98d870 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -56,6 +56,7 @@ import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream;
@@ -140,26 +141,31 @@ public class AvroSource<T> extends BlockBasedSource<T> {
* Reads from the given file name or pattern ("glob"). The returned source can be further
* configured by calling {@link #withSchema} to return a type other than {@link GenericRecord}.
*/
- public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
+ public static AvroSource<GenericRecord> from(ValueProvider<String> fileNameOrPattern) {
return new AvroSource<>(fileNameOrPattern, DEFAULT_MIN_BUNDLE_SIZE, null, GenericRecord.class);
}
+ /** Like {@link #from(ValueProvider)}. */
+ public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
+ return from(ValueProvider.StaticValueProvider.of(fileNameOrPattern));
+ }
+
/** Reads files containing records that conform to the given schema. */
public AvroSource<GenericRecord> withSchema(String schema) {
return new AvroSource<>(
- getFileOrPatternSpec(), getMinBundleSize(), schema, GenericRecord.class);
+ getFileOrPatternSpecProvider(), getMinBundleSize(), schema, GenericRecord.class);
}
/** Like {@link #withSchema(String)}. */
public AvroSource<GenericRecord> withSchema(Schema schema) {
return new AvroSource<>(
- getFileOrPatternSpec(), getMinBundleSize(), schema.toString(), GenericRecord.class);
+ getFileOrPatternSpecProvider(), getMinBundleSize(), schema.toString(), GenericRecord.class);
}
/** Reads files containing records of the given class. */
public <X> AvroSource<X> withSchema(Class<X> clazz) {
return new AvroSource<>(
- getFileOrPatternSpec(),
+ getFileOrPatternSpecProvider(),
getMinBundleSize(),
ReflectData.get().getSchema(clazz).toString(),
clazz);
@@ -170,12 +176,16 @@ public class AvroSource<T> extends BlockBasedSource<T> {
* minBundleSize} and its use.
*/
public AvroSource<T> withMinBundleSize(long minBundleSize) {
- return new AvroSource<>(getFileOrPatternSpec(), minBundleSize, readerSchemaString, type);
+ return new AvroSource<>(
+ getFileOrPatternSpecProvider(), minBundleSize, readerSchemaString, type);
}
/** Constructor for FILEPATTERN mode. */
private AvroSource(
- String fileNameOrPattern, long minBundleSize, String readerSchemaString, Class<T> type) {
+ ValueProvider<String> fileNameOrPattern,
+ long minBundleSize,
+ String readerSchemaString,
+ Class<T> type) {
super(fileNameOrPattern, minBundleSize);
this.readerSchemaString = internSchemaString(readerSchemaString);
this.type = type;
@@ -378,7 +388,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
type);
case FILEPATTERN:
return new AvroSource<>(
- getFileOrPatternSpec(), getMinBundleSize(), readerSchemaString, type);
+ getFileOrPatternSpecProvider(), getMinBundleSize(), readerSchemaString, type);
default:
throw new InvalidObjectException(
String.format("Unknown mode %s for AvroSource %s", getMode(), this));
http://git-wip-us.apache.org/repos/asf/beam/blob/e80c83b2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
index cf6671e..25e8483 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
@@ -23,6 +23,7 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
/**
@@ -69,6 +70,11 @@ public abstract class BlockBasedSource<T> extends FileBasedSource<T> {
super(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
}
+ /** Like {@link #BlockBasedSource(String, long)}. */
+ public BlockBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
+ super(fileOrPatternSpec, minBundleSize);
+ }
+
/**
* Creates a {@code BlockBasedSource} for a single file. Subclasses must call this constructor
* when implementing {@link BlockBasedSource#createForSubrangeOfFile}. See documentation in
[4/4] beam git commit: This closes #3615: [BEAM-2656] Introduces
AvroIO.readAll()
Posted by jk...@apache.org.
This closes #3615: [BEAM-2656] Introduces AvroIO.readAll()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d919394c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d919394c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d919394c
Branch: refs/heads/master
Commit: d919394c7fb4080a2069d24da5a09a410e215e9f
Parents: 71196ec ee1bcba
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jul 25 17:37:06 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jul 25 17:37:06 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 165 +++++++++++++++----
.../java/org/apache/beam/sdk/io/AvroSource.java | 24 ++-
.../java/org/apache/beam/sdk/io/AvroUtils.java | 40 +++++
.../apache/beam/sdk/io/BlockBasedSource.java | 6 +
.../beam/sdk/io/ReadAllViaFileBasedSource.java | 152 +++++++++++++++++
.../java/org/apache/beam/sdk/io/TextIO.java | 135 +++------------
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 65 +++++++-
7 files changed, 436 insertions(+), 151 deletions(-)
----------------------------------------------------------------------