You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/26 00:51:50 UTC

[1/4] beam git commit: Introduces AvroIO.readAll() and readAllGenericRecords()

Repository: beam
Updated Branches:
  refs/heads/master 71196ec9c -> d919394c7


Introduces AvroIO.readAll() and readAllGenericRecords()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee1bcbae
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee1bcbae
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee1bcbae

Branch: refs/heads/master
Commit: ee1bcbae08fb221e392175fbd0387594653d4a86
Parents: eaf0b36
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jul 21 14:09:35 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jul 25 17:36:49 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 132 +++++++++++++++++--
 .../java/org/apache/beam/sdk/io/AvroUtils.java  |  40 ++++++
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  65 ++++++++-
 3 files changed, 223 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ee1bcbae/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index d308c85..f201114 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -21,6 +21,8 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.io.BaseEncoding;
@@ -54,14 +56,17 @@ import org.apache.beam.sdk.values.PDone;
  * {@link PTransform}s for reading and writing Avro files.
  *
  * <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()}, using
- * {@link AvroIO.Read#from} to specify the filename or filepattern to read from. See {@link
- * FileSystems} for information on supported file systems and filepatterns.
+ * {@link AvroIO.Read#from} to specify the filename or filepattern to read from. Alternatively, if
+ * the filepatterns to be read are themselves in a {@link PCollection}, apply {@link #readAll}.
+ *
+ * <p>See {@link FileSystems} for information on supported file systems and filepatterns.
  *
  * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read
  * {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes a
  * {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a
  * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
- * schema.
+ * schema. Likewise, to read a {@link PCollection} of filepatterns, apply {@link
+ * #readAllGenericRecords}.
  *
  * <p>For example:
  *
@@ -79,6 +84,18 @@ import org.apache.beam.sdk.values.PDone;
  *                .from("gs://my_bucket/path/to/records-*.avro"));
  * }</pre>
  *
+ * <p>Reading from a {@link PCollection} of filepatterns:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<String> filepatterns = p.apply(...);
+ * PCollection<AvroAutoGenClass> records =
+ *     filepatterns.apply(AvroIO.read(AvroAutoGenClass.class));
+ * PCollection<GenericRecord> genericRecords =
+ *     filepatterns.apply(AvroIO.readGenericRecords(schema));
+ * }</pre>
+ *
  * <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, using
  * {@code AvroIO.write().to(String)} to specify the output filename prefix. The default {@link
  * DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link ShardNameTemplate} (set
@@ -133,6 +150,18 @@ public class AvroIO {
         .build();
   }
 
+  /** Like {@link #read}, but reads each filepattern in the input {@link PCollection}. */
+  public static <T> ReadAll<T> readAll(Class<T> recordClass) {
+    return new AutoValue_AvroIO_ReadAll.Builder<T>()
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
+        // 64MB is a reasonable value that allows to amortize the cost of opening files,
+        // but is not so large as to exhaust a typical runner's maximum amount of output per
+        // ProcessElement call.
+        .setDesiredBundleSizeBytes(64 * 1024 * 1024L)
+        .build();
+  }
+
   /** Reads Avro file(s) containing records of the specified schema. */
   public static Read<GenericRecord> readGenericRecords(Schema schema) {
     return new AutoValue_AvroIO_Read.Builder<GenericRecord>()
@@ -142,6 +171,17 @@ public class AvroIO {
   }
 
   /**
+   * Like {@link #readGenericRecords(Schema)}, but reads each filepattern in the input {@link
+   * PCollection}.
+   */
+  public static ReadAll<GenericRecord> readAllGenericRecords(Schema schema) {
+    return new AutoValue_AvroIO_ReadAll.Builder<GenericRecord>()
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .build();
+  }
+
+  /**
    * Reads Avro file(s) containing records of the specified schema. The schema is specified as a
    * JSON-encoded string.
    */
@@ -150,6 +190,14 @@ public class AvroIO {
   }
 
   /**
+   * Like {@link #readGenericRecords(String)}, but reads each filepattern in the input {@link
+   * PCollection}.
+   */
+  public static ReadAll<GenericRecord> readAllGenericRecords(String schema) {
+    return readAllGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  /**
    * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding
    * pattern).
    */
@@ -217,14 +265,12 @@ public class AvroIO {
     public PCollection<T> expand(PBegin input) {
       checkNotNull(getFilepattern(), "filepattern");
       checkNotNull(getSchema(), "schema");
-
-      @SuppressWarnings("unchecked")
-      AvroSource<T> source =
-          getRecordClass() == GenericRecord.class
-              ? (AvroSource<T>) AvroSource.from(getFilepattern()).withSchema(getSchema())
-              : AvroSource.from(getFilepattern()).withSchema(getRecordClass());
-
-      return input.getPipeline().apply("Read", org.apache.beam.sdk.io.Read.from(source));
+      return input
+          .getPipeline()
+          .apply(
+              "Read",
+              org.apache.beam.sdk.io.Read.from(
+                  createSource(getFilepattern(), getRecordClass(), getSchema())));
     }
 
     @Override
@@ -233,6 +279,70 @@ public class AvroIO {
       builder.addIfNotNull(
           DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"));
     }
+
+    @SuppressWarnings("unchecked")
+    private static <T> AvroSource<T> createSource(
+        ValueProvider<String> filepattern, Class<T> recordClass, Schema schema) {
+      return recordClass == GenericRecord.class
+          ? (AvroSource<T>) AvroSource.from(filepattern).withSchema(schema)
+          : AvroSource.from(filepattern).withSchema(recordClass);
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Implementation of {@link #readAll}. */
+  @AutoValue
+  public abstract static class ReadAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
+    @Nullable abstract Class<T> getRecordClass();
+    @Nullable abstract Schema getSchema();
+    abstract long getDesiredBundleSizeBytes();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+      abstract Builder<T> setSchema(Schema schema);
+      abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
+
+      abstract ReadAll<T> build();
+    }
+
+    @VisibleForTesting
+    ReadAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+      return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<String> input) {
+      checkNotNull(getSchema(), "schema");
+      return input
+          .apply(
+              "Read all via FileBasedSource",
+              new ReadAllViaFileBasedSource<>(
+                  SerializableFunctions.<String, Boolean>constant(true) /* isSplittable */,
+                  getDesiredBundleSizeBytes(),
+                  new CreateSourceFn<>(getRecordClass(), getSchema().toString())))
+          .setCoder(AvroCoder.of(getRecordClass(), getSchema()));
+    }
+  }
+
+  private static class CreateSourceFn<T>
+      implements SerializableFunction<String, FileBasedSource<T>> {
+    private final Class<T> recordClass;
+    private final Supplier<Schema> schemaSupplier;
+
+    public CreateSourceFn(Class<T> recordClass, String jsonSchema) {
+      this.recordClass = recordClass;
+      this.schemaSupplier = AvroUtils.serializableSchemaSupplier(jsonSchema);
+    }
+
+    @Override
+    public FileBasedSource<T> apply(String input) {
+      return Read.createSource(
+          StaticValueProvider.of(input), recordClass, schemaSupplier.get());
+    }
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/ee1bcbae/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroUtils.java
new file mode 100644
index 0000000..65c5bf1
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import java.io.Serializable;
+import org.apache.avro.Schema;
+
+/** Helpers for working with Avro. */
+class AvroUtils {
+  /** Helper to get around the fact that {@link Schema} itself is not serializable. */
+  public static Supplier<Schema> serializableSchemaSupplier(String jsonSchema) {
+    return Suppliers.memoize(
+        Suppliers.compose(new JsonToSchema(), Suppliers.ofInstance(jsonSchema)));
+  }
+
+  private static class JsonToSchema implements Function<String, Schema>, Serializable {
+    @Override
+    public Schema apply(String input) {
+      return new Schema.Parser().parse(input);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ee1bcbae/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 4380c57..df5d26c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -30,6 +30,7 @@ import static org.junit.Assert.assertTrue;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import java.io.File;
@@ -152,10 +153,68 @@ public class AvroIOTest {
         .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
     writePipeline.run().waitUntilFinish();
 
-    PCollection<GenericClass> input =
-        readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
+    // Test both read() and readAll()
+    PAssert.that(
+            readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
+        .containsInAnyOrder(values);
+    PAssert.that(
+            readPipeline
+                .apply(Create.of(outputFile.getAbsolutePath()))
+                .apply(AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+        .containsInAnyOrder(values);
+
+    readPipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testAvroIOWriteAndReadMultipleFilepatterns() throws Throwable {
+    List<GenericClass> firstValues = Lists.newArrayList();
+    List<GenericClass> secondValues = Lists.newArrayList();
+    for (int i = 0; i < 10; ++i) {
+      firstValues.add(new GenericClass(i, "a" + i));
+      secondValues.add(new GenericClass(i, "b" + i));
+    }
+    writePipeline
+        .apply("Create first", Create.of(firstValues))
+        .apply(
+            "Write first",
+            AvroIO.write(GenericClass.class)
+                .to(tmpFolder.getRoot().getAbsolutePath() + "/first")
+                .withNumShards(2));
+    writePipeline
+        .apply("Create second", Create.of(secondValues))
+        .apply(
+            "Write second",
+            AvroIO.write(GenericClass.class)
+                .to(tmpFolder.getRoot().getAbsolutePath() + "/second")
+                .withNumShards(3));
+    writePipeline.run().waitUntilFinish();
+
+    // Test both read() and readAll()
+    PAssert.that(
+            readPipeline.apply(
+                "Read first",
+                AvroIO.read(GenericClass.class)
+                    .from(tmpFolder.getRoot().getAbsolutePath() + "/first*")))
+        .containsInAnyOrder(firstValues);
+    PAssert.that(
+            readPipeline.apply(
+                "Read second",
+                AvroIO.read(GenericClass.class)
+                    .from(tmpFolder.getRoot().getAbsolutePath() + "/second*")))
+        .containsInAnyOrder(secondValues);
+    PAssert.that(
+            readPipeline
+                .apply(
+                    "Create paths",
+                    Create.of(
+                        tmpFolder.getRoot().getAbsolutePath() + "/first*",
+                        tmpFolder.getRoot().getAbsolutePath() + "/second*"))
+                .apply(
+                    "Read all", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+        .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
 
-    PAssert.that(input).containsInAnyOrder(values);
     readPipeline.run();
   }
 


[2/4] beam git commit: Extracts common logic from TextIO.ReadAll into a utility transform

Posted by jk...@apache.org.
Extracts common logic from TextIO.ReadAll into a utility transform


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eaf0b363
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eaf0b363
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eaf0b363

Branch: refs/heads/master
Commit: eaf0b36313fcd59963b2efbf16f50dd913da7de2
Parents: e80c83b
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jul 21 14:09:13 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jul 25 17:36:49 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/ReadAllViaFileBasedSource.java  | 152 +++++++++++++++++++
 .../java/org/apache/beam/sdk/io/TextIO.java     | 135 ++++------------
 2 files changed, 179 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/eaf0b363/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
new file mode 100644
index 0000000..66aa41e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Reads each filepattern in the input {@link PCollection} using given parameters for splitting
+ * files into offset ranges and for creating a {@link FileBasedSource} for a file.
+ */
+class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, PCollection<T>> {
+  private final SerializableFunction<String, Boolean> isSplittable;
+  private final long desiredBundleSizeBytes;
+  private final SerializableFunction<String, FileBasedSource<T>> createSource;
+
+  public ReadAllViaFileBasedSource(
+      SerializableFunction<String, Boolean> isSplittable,
+      long desiredBundleSizeBytes,
+      SerializableFunction<String, FileBasedSource<T>> createSource) {
+    this.isSplittable = isSplittable;
+    this.desiredBundleSizeBytes = desiredBundleSizeBytes;
+    this.createSource = createSource;
+  }
+
+  @Override
+  public PCollection<T> expand(PCollection<String> input) {
+    return input
+        .apply("Expand glob", ParDo.of(new ExpandGlobFn()))
+        .apply(
+            "Split into ranges",
+            ParDo.of(new SplitIntoRangesFn(isSplittable, desiredBundleSizeBytes)))
+        .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<MatchResult.Metadata, OffsetRange>>())
+        .apply("Read ranges", ParDo.of(new ReadFileRangesFn<T>(createSource)));
+  }
+
+  private static class ReshuffleWithUniqueKey<T>
+      extends PTransform<PCollection<T>, PCollection<T>> {
+    @Override
+    public PCollection<T> expand(PCollection<T> input) {
+      return input
+          .apply("Unique key", ParDo.of(new AssignUniqueKeyFn<T>()))
+          .apply("Reshuffle", Reshuffle.<Integer, T>of())
+          .apply("Values", Values.<T>create());
+    }
+  }
+
+  private static class AssignUniqueKeyFn<T> extends DoFn<T, KV<Integer, T>> {
+    private int index;
+
+    @Setup
+    public void setup() {
+      this.index = ThreadLocalRandom.current().nextInt();
+    }
+
+    @ProcessElement
+    public void process(ProcessContext c) {
+      c.output(KV.of(++index, c.element()));
+    }
+  }
+
+  private static class ExpandGlobFn extends DoFn<String, MatchResult.Metadata> {
+    @ProcessElement
+    public void process(ProcessContext c) throws Exception {
+      MatchResult match = FileSystems.match(c.element());
+      checkArgument(
+          match.status().equals(MatchResult.Status.OK),
+          "Failed to match filepattern %s: %s",
+          c.element(),
+          match.status());
+      for (MatchResult.Metadata metadata : match.metadata()) {
+        c.output(metadata);
+      }
+    }
+  }
+
+  private static class SplitIntoRangesFn
+      extends DoFn<MatchResult.Metadata, KV<MatchResult.Metadata, OffsetRange>> {
+    private final SerializableFunction<String, Boolean> isSplittable;
+    private final long desiredBundleSizeBytes;
+
+    private SplitIntoRangesFn(
+        SerializableFunction<String, Boolean> isSplittable, long desiredBundleSizeBytes) {
+      this.isSplittable = isSplittable;
+      this.desiredBundleSizeBytes = desiredBundleSizeBytes;
+    }
+
+    @ProcessElement
+    public void process(ProcessContext c) {
+      MatchResult.Metadata metadata = c.element();
+      if (!metadata.isReadSeekEfficient()
+          || !isSplittable.apply(metadata.resourceId().toString())) {
+        c.output(KV.of(metadata, new OffsetRange(0, metadata.sizeBytes())));
+        return;
+      }
+      for (OffsetRange range :
+          new OffsetRange(0, metadata.sizeBytes()).split(desiredBundleSizeBytes, 0)) {
+        c.output(KV.of(metadata, range));
+      }
+    }
+  }
+
+  private static class ReadFileRangesFn<T> extends DoFn<KV<MatchResult.Metadata, OffsetRange>, T> {
+    private final SerializableFunction<String, FileBasedSource<T>> createSource;
+
+    private ReadFileRangesFn(SerializableFunction<String, FileBasedSource<T>> createSource) {
+      this.createSource = createSource;
+    }
+
+    @ProcessElement
+    public void process(ProcessContext c) throws IOException {
+      MatchResult.Metadata metadata = c.element().getKey();
+      OffsetRange range = c.element().getValue();
+      FileBasedSource<T> source = createSource.apply(metadata.toString());
+      try (BoundedSource.BoundedReader<T> reader =
+          source
+              .createForSubrangeOfFile(metadata, range.getFrom(), range.getTo())
+              .createReader(c.getPipelineOptions())) {
+        for (boolean more = reader.start(); more; more = reader.advance()) {
+          c.output(reader.getCurrent());
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf0b363/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 7b4c483..73040da 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -23,8 +23,6 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -37,23 +35,14 @@ import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.Read.Bounded;
-import org.apache.beam.sdk.io.fs.MatchResult;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.io.fs.MatchResult.Status;
 import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
-import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -356,120 +345,50 @@ public class TextIO {
     @Override
     public PCollection<String> expand(PCollection<String> input) {
       return input
-          .apply("Expand glob", ParDo.of(new ExpandGlobFn()))
           .apply(
-              "Split into ranges",
-              ParDo.of(new SplitIntoRangesFn(getCompressionType(), getDesiredBundleSizeBytes())))
-          .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<Metadata, OffsetRange>>())
-          .apply("Read", ParDo.of(new ReadTextFn(this)));
+              "Read all via FileBasedSource",
+              new ReadAllViaFileBasedSource<>(
+                  new IsSplittableFn(getCompressionType()),
+                  getDesiredBundleSizeBytes(),
+                  new CreateTextSourceFn(getCompressionType())))
+          .setCoder(StringUtf8Coder.of());
     }
 
-    private static class ReshuffleWithUniqueKey<T>
-        extends PTransform<PCollection<T>, PCollection<T>> {
-      @Override
-      public PCollection<T> expand(PCollection<T> input) {
-        return input
-            .apply("Unique key", ParDo.of(new AssignUniqueKeyFn<T>()))
-            .apply("Reshuffle", Reshuffle.<Integer, T>of())
-            .apply("Values", Values.<T>create());
-      }
-    }
-
-    private static class AssignUniqueKeyFn<T> extends DoFn<T, KV<Integer, T>> {
-      private int index;
-
-      @Setup
-      public void setup() {
-        this.index = ThreadLocalRandom.current().nextInt();
-      }
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
 
-      @ProcessElement
-      public void process(ProcessContext c) {
-        c.output(KV.of(++index, c.element()));
-      }
+      builder.add(
+          DisplayData.item("compressionType", getCompressionType().toString())
+              .withLabel("Compression Type"));
     }
 
-    private static class ExpandGlobFn extends DoFn<String, Metadata> {
-      @ProcessElement
-      public void process(ProcessContext c) throws Exception {
-        MatchResult match = FileSystems.match(c.element());
-        checkArgument(
-            match.status().equals(Status.OK),
-            "Failed to match filepattern %s: %s",
-            c.element(),
-            match.status());
-        for (Metadata metadata : match.metadata()) {
-          c.output(metadata);
-        }
-      }
-    }
-
-    private static class SplitIntoRangesFn extends DoFn<Metadata, KV<Metadata, OffsetRange>> {
+    private static class CreateTextSourceFn
+        implements SerializableFunction<String, FileBasedSource<String>> {
       private final CompressionType compressionType;
-      private final long desiredBundleSize;
 
-      private SplitIntoRangesFn(CompressionType compressionType, long desiredBundleSize) {
+      private CreateTextSourceFn(CompressionType compressionType) {
         this.compressionType = compressionType;
-        this.desiredBundleSize = desiredBundleSize;
-      }
-
-      @ProcessElement
-      public void process(ProcessContext c) {
-        Metadata metadata = c.element();
-        final boolean isSplittable = isSplittable(metadata, compressionType);
-        if (!isSplittable) {
-          c.output(KV.of(metadata, new OffsetRange(0, metadata.sizeBytes())));
-          return;
-        }
-        for (OffsetRange range :
-            new OffsetRange(0, metadata.sizeBytes()).split(desiredBundleSize, 0)) {
-          c.output(KV.of(metadata, range));
-        }
       }
 
-      static boolean isSplittable(Metadata metadata, CompressionType compressionType) {
-        if (!metadata.isReadSeekEfficient()) {
-          return false;
-        }
-        switch (compressionType) {
-          case AUTO:
-            return !CompressionMode.isCompressed(metadata.resourceId().toString());
-          case UNCOMPRESSED:
-            return true;
-          case GZIP:
-          case BZIP2:
-          case ZIP:
-          case DEFLATE:
-            return false;
-          default:
-            throw new UnsupportedOperationException("Unknown compression type: " + compressionType);
-        }
+      @Override
+      public FileBasedSource<String> apply(String input) {
+        return Read.wrapWithCompression(
+            new TextSource(StaticValueProvider.of(input)), compressionType);
       }
     }
 
-    private static class ReadTextFn extends DoFn<KV<Metadata, OffsetRange>, String> {
-      private final TextIO.ReadAll spec;
+    private static class IsSplittableFn implements SerializableFunction<String, Boolean> {
+      private final CompressionType compressionType;
 
-      private ReadTextFn(ReadAll spec) {
-        this.spec = spec;
+      private IsSplittableFn(CompressionType compressionType) {
+        this.compressionType = compressionType;
       }
 
-      @ProcessElement
-      public void process(ProcessContext c) throws IOException {
-        Metadata metadata = c.element().getKey();
-        OffsetRange range = c.element().getValue();
-        FileBasedSource<String> source =
-            TextIO.Read.wrapWithCompression(
-                new TextSource(StaticValueProvider.of(metadata.toString())),
-                spec.getCompressionType());
-        try (BoundedSource.BoundedReader<String> reader =
-            source
-                .createForSubrangeOfFile(metadata, range.getFrom(), range.getTo())
-                .createReader(c.getPipelineOptions())) {
-          for (boolean more = reader.start(); more; more = reader.advance()) {
-            c.output(reader.getCurrent());
-          }
-        }
+      @Override
+      public Boolean apply(String filename) {
+        return compressionType == CompressionType.UNCOMPRESSED
+            || (compressionType == CompressionType.AUTO && !CompressionMode.isCompressed(filename));
       }
     }
   }


[3/4] beam git commit: Adds ValueProvider support to AvroIO.Read

Posted by jk...@apache.org.
Adds ValueProvider support to AvroIO.Read


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e80c83b2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e80c83b2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e80c83b2

Branch: refs/heads/master
Commit: e80c83b2a0a1cf55aa8a452a02a76c9dc13697cc
Parents: 71196ec
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jul 21 12:38:17 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jul 25 17:36:49 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 49 ++++++++------------
 .../java/org/apache/beam/sdk/io/AvroSource.java | 24 +++++++---
 .../apache/beam/sdk/io/BlockBasedSource.java    |  6 +++
 3 files changed, 42 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e80c83b2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 89cadbd..d308c85 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableMap;
@@ -36,7 +37,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
-import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -185,10 +185,10 @@ public class AvroIO {
         .setWindowedWrites(false);
   }
 
-  /** Implementation of {@link #read}. */
+  /** Implementation of {@link #read} and {@link #readGenericRecords}. */
   @AutoValue
   public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
-    @Nullable abstract String getFilepattern();
+    @Nullable abstract ValueProvider<String> getFilepattern();
     @Nullable abstract Class<T> getRecordClass();
     @Nullable abstract Schema getSchema();
 
@@ -196,7 +196,7 @@ public class AvroIO {
 
     @AutoValue.Builder
     abstract static class Builder<T> {
-      abstract Builder<T> setFilepattern(String filepattern);
+      abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
       abstract Builder<T> setRecordClass(Class<T> recordClass);
       abstract Builder<T> setSchema(Schema schema);
 
@@ -204,45 +204,34 @@ public class AvroIO {
     }
 
     /** Reads from the given filename or filepattern. */
-    public Read<T> from(String filepattern) {
+    public Read<T> from(ValueProvider<String> filepattern) {
       return toBuilder().setFilepattern(filepattern).build();
     }
 
+    /** Like {@link #from(ValueProvider)}. */
+    public Read<T> from(String filepattern) {
+      return from(StaticValueProvider.of(filepattern));
+    }
+
     @Override
     public PCollection<T> expand(PBegin input) {
-      if (getFilepattern() == null) {
-        throw new IllegalStateException(
-            "need to set the filepattern of an AvroIO.Read transform");
-      }
-      if (getSchema() == null) {
-        throw new IllegalStateException("need to set the schema of an AvroIO.Read transform");
-      }
+      checkNotNull(getFilepattern(), "filepattern");
+      checkNotNull(getSchema(), "schema");
 
       @SuppressWarnings("unchecked")
-      Bounded<T> read =
+      AvroSource<T> source =
           getRecordClass() == GenericRecord.class
-              ? (Bounded<T>) org.apache.beam.sdk.io.Read.from(
-                  AvroSource.from(getFilepattern()).withSchema(getSchema()))
-              : org.apache.beam.sdk.io.Read.from(
-                  AvroSource.from(getFilepattern()).withSchema(getRecordClass()));
-
-      PCollection<T> pcol = input.getPipeline().apply("Read", read);
-      // Honor the default output coder that would have been used by this PTransform.
-      pcol.setCoder(getDefaultOutputCoder());
-      return pcol;
+              ? (AvroSource<T>) AvroSource.from(getFilepattern()).withSchema(getSchema())
+              : AvroSource.from(getFilepattern()).withSchema(getRecordClass());
+
+      return input.getPipeline().apply("Read", org.apache.beam.sdk.io.Read.from(source));
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      builder
-        .addIfNotNull(DisplayData.item("filePattern", getFilepattern())
-          .withLabel("Input File Pattern"));
-    }
-
-    @Override
-    protected Coder<T> getDefaultOutputCoder() {
-      return AvroCoder.of(getRecordClass(), getSchema());
+      builder.addIfNotNull(
+          DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e80c83b2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 7cd97a8..a98d870 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -56,6 +56,7 @@ import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream;
@@ -140,26 +141,31 @@ public class AvroSource<T> extends BlockBasedSource<T> {
    * Reads from the given file name or pattern ("glob"). The returned source can be further
    * configured by calling {@link #withSchema} to return a type other than {@link GenericRecord}.
    */
-  public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
+  public static AvroSource<GenericRecord> from(ValueProvider<String> fileNameOrPattern) {
     return new AvroSource<>(fileNameOrPattern, DEFAULT_MIN_BUNDLE_SIZE, null, GenericRecord.class);
   }
 
+  /** Like {@link #from(ValueProvider)}. */
+  public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
+    return from(ValueProvider.StaticValueProvider.of(fileNameOrPattern));
+  }
+
   /** Reads files containing records that conform to the given schema. */
   public AvroSource<GenericRecord> withSchema(String schema) {
     return new AvroSource<>(
-        getFileOrPatternSpec(), getMinBundleSize(), schema, GenericRecord.class);
+        getFileOrPatternSpecProvider(), getMinBundleSize(), schema, GenericRecord.class);
   }
 
   /** Like {@link #withSchema(String)}. */
   public AvroSource<GenericRecord> withSchema(Schema schema) {
     return new AvroSource<>(
-        getFileOrPatternSpec(), getMinBundleSize(), schema.toString(), GenericRecord.class);
+        getFileOrPatternSpecProvider(), getMinBundleSize(), schema.toString(), GenericRecord.class);
   }
 
   /** Reads files containing records of the given class. */
   public <X> AvroSource<X> withSchema(Class<X> clazz) {
     return new AvroSource<>(
-        getFileOrPatternSpec(),
+        getFileOrPatternSpecProvider(),
         getMinBundleSize(),
         ReflectData.get().getSchema(clazz).toString(),
         clazz);
@@ -170,12 +176,16 @@ public class AvroSource<T> extends BlockBasedSource<T> {
    * minBundleSize} and its use.
    */
   public AvroSource<T> withMinBundleSize(long minBundleSize) {
-    return new AvroSource<>(getFileOrPatternSpec(), minBundleSize, readerSchemaString, type);
+    return new AvroSource<>(
+        getFileOrPatternSpecProvider(), minBundleSize, readerSchemaString, type);
   }
 
   /** Constructor for FILEPATTERN mode. */
   private AvroSource(
-      String fileNameOrPattern, long minBundleSize, String readerSchemaString, Class<T> type) {
+      ValueProvider<String> fileNameOrPattern,
+      long minBundleSize,
+      String readerSchemaString,
+      Class<T> type) {
     super(fileNameOrPattern, minBundleSize);
     this.readerSchemaString = internSchemaString(readerSchemaString);
     this.type = type;
@@ -378,7 +388,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
             type);
       case FILEPATTERN:
         return new AvroSource<>(
-            getFileOrPatternSpec(), getMinBundleSize(), readerSchemaString, type);
+            getFileOrPatternSpecProvider(), getMinBundleSize(), readerSchemaString, type);
         default:
           throw new InvalidObjectException(
               String.format("Unknown mode %s for AvroSource %s", getMode(), this));

http://git-wip-us.apache.org/repos/asf/beam/blob/e80c83b2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
index cf6671e..25e8483 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
@@ -23,6 +23,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 
 /**
@@ -69,6 +70,11 @@ public abstract class BlockBasedSource<T> extends FileBasedSource<T> {
     super(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
   }
 
+  /** Like {@link #BlockBasedSource(String, long)}. */
+  public BlockBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
+    super(fileOrPatternSpec, minBundleSize);
+  }
+
   /**
    * Creates a {@code BlockBasedSource} for a single file. Subclasses must call this constructor
    * when implementing {@link BlockBasedSource#createForSubrangeOfFile}. See documentation in


[4/4] beam git commit: This closes #3615: [BEAM-2656] Introduces AvroIO.readAll()

Posted by jk...@apache.org.
This closes #3615: [BEAM-2656] Introduces AvroIO.readAll()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d919394c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d919394c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d919394c

Branch: refs/heads/master
Commit: d919394c7fb4080a2069d24da5a09a410e215e9f
Parents: 71196ec ee1bcba
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jul 25 17:37:06 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jul 25 17:37:06 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 165 +++++++++++++++----
 .../java/org/apache/beam/sdk/io/AvroSource.java |  24 ++-
 .../java/org/apache/beam/sdk/io/AvroUtils.java  |  40 +++++
 .../apache/beam/sdk/io/BlockBasedSource.java    |   6 +
 .../beam/sdk/io/ReadAllViaFileBasedSource.java  | 152 +++++++++++++++++
 .../java/org/apache/beam/sdk/io/TextIO.java     | 135 +++------------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  65 +++++++-
 7 files changed, 436 insertions(+), 151 deletions(-)
----------------------------------------------------------------------