You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/28 17:53:05 UTC

[1/4] beam git commit: Introduces TypeDescriptors.extractFromTypeParameters

Repository: beam
Updated Branches:
  refs/heads/master 7ab8954b9 -> a94d680ea


Introduces TypeDescriptors.extractFromTypeParameters


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62c922b3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62c922b3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62c922b3

Branch: refs/heads/master
Commit: 62c922b3c2dc3163b180f972a7449cf5e6ac501a
Parents: 7ab8954
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jul 25 17:18:16 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jul 28 10:23:15 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSink.java   |  24 ++--
 .../apache/beam/sdk/values/TypeDescriptor.java  |  64 +++++++---
 .../apache/beam/sdk/values/TypeDescriptors.java | 118 ++++++++++++++++++-
 .../beam/sdk/values/TypeDescriptorsTest.java    |  49 ++++++++
 .../io/gcp/bigquery/DynamicDestinations.java    |  22 ++--
 5 files changed, 235 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/62c922b3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 9953975..3bf5d5b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Verify.verifyNotNull;
 import static org.apache.beam.sdk.io.WriteFiles.UNKNOWN_SHARDNUM;
+import static org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParameters;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
@@ -33,7 +34,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-import java.lang.reflect.TypeVariable;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
@@ -75,6 +75,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
 import org.joda.time.Instant;
@@ -255,17 +256,16 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
         return destinationCoder;
       }
       // If dynamicDestinations doesn't provide a coder, try to find it in the coder registry.
-      // We must first use reflection to figure out what the type parameter is.
-      TypeDescriptor<?> superDescriptor =
-          TypeDescriptor.of(getClass()).getSupertype(DynamicDestinations.class);
-      if (!superDescriptor.getRawType().equals(DynamicDestinations.class)) {
-        throw new AssertionError(
-            "Couldn't find the DynamicDestinations superclass of " + this.getClass());
-      }
-      TypeVariable typeVariable = superDescriptor.getTypeParameter("DestinationT");
-      @SuppressWarnings("unchecked")
-      TypeDescriptor<DestinationT> descriptor =
-          (TypeDescriptor<DestinationT>) superDescriptor.resolveType(typeVariable);
+      @Nullable TypeDescriptor<DestinationT> descriptor =
+          extractFromTypeParameters(
+              this,
+              DynamicDestinations.class,
+              new TypeVariableExtractor<
+                  DynamicDestinations<UserT, DestinationT>, DestinationT>() {});
+      checkArgument(
+          descriptor != null,
+          "Unable to infer a coder for DestinationT, "
+              + "please specify it explicitly by overriding getDestinationCoder()");
       return registry.getCoder(descriptor);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/62c922b3/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java
index 14f2cb8..dd6a0fd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java
@@ -328,30 +328,64 @@ public abstract class TypeDescriptor<T> implements Serializable {
   }
 
   /**
-   * Returns a new {@code TypeDescriptor} where type variables represented by
-   * {@code typeParameter} are substituted by {@code typeDescriptor}. For example, it can be used to
-   * construct {@code Map<K, V>} for any {@code K} and {@code V} type: <pre> {@code
-   *   static <K, V> TypeDescriptor<Map<K, V>> mapOf(
-   *       TypeDescriptor<K> keyType, TypeDescriptor<V> valueType) {
-   *     return new TypeDescriptor<Map<K, V>>() {}
-   *         .where(new TypeParameter<K>() {}, keyType)
-   *         .where(new TypeParameter<V>() {}, valueType);
-   *   }}</pre>
+   * Returns a new {@code TypeDescriptor} where the type variable represented by {@code
+   * typeParameter} are substituted by {@code typeDescriptor}. For example, it can be used to
+   * construct {@code Map<K, V>} for any {@code K} and {@code V} type:
+   *
+   * <pre>{@code
+   * static <K, V> TypeDescriptor<Map<K, V>> mapOf(
+   *     TypeDescriptor<K> keyType, TypeDescriptor<V> valueType) {
+   *   return new TypeDescriptor<Map<K, V>>() {}
+   *       .where(new TypeParameter<K>() {}, keyType)
+   *       .where(new TypeParameter<V>() {}, valueType);
+   * }
+   * }</pre>
    *
    * @param <X> The parameter type
    * @param typeParameter the parameter type variable
    * @param typeDescriptor the actual type to substitute
    */
   @SuppressWarnings("unchecked")
-  public <X> TypeDescriptor<T> where(TypeParameter<X> typeParameter,
-      TypeDescriptor<X> typeDescriptor) {
-    TypeResolver resolver =
-        new TypeResolver()
-            .where(
-                typeParameter.typeVariable, typeDescriptor.getType());
+  public <X> TypeDescriptor<T> where(
+      TypeParameter<X> typeParameter, TypeDescriptor<X> typeDescriptor) {
+    return where(typeParameter.typeVariable, typeDescriptor.getType());
+  }
+
+  /**
+   * A more general form of {@link #where(TypeParameter, TypeDescriptor)} that returns a new {@code
+   * TypeDescriptor} by matching {@code formal} against {@code actual} to resolve type variables in
+   * the current {@link TypeDescriptor}.
+   */
+  @SuppressWarnings("unchecked")
+  public TypeDescriptor<T> where(Type formal, Type actual) {
+    TypeResolver resolver = new TypeResolver().where(formal, actual);
     return (TypeDescriptor<T>) TypeDescriptor.of(resolver.resolveType(token.getType()));
   }
 
+  /**
+   * Returns whether this {@link TypeDescriptor} has any unresolved type parameters, as opposed to
+   * being a concrete type.
+   *
+   * <p>For example:
+   * <pre>{@code
+   *   TypeDescriptor.of(new ArrayList<String>() {}.getClass()).hasUnresolvedTypeParameters()
+   *     => false, because the anonymous class is instantiated with a concrete type
+   *
+   *   class TestUtils {
+   *     <T> ArrayList<T> createTypeErasedList() {
+   *       return new ArrayList<T>() {};
+   *     }
+   *   }
+   *
+   *   TypeDescriptor.of(TestUtils.<String>createTypeErasedList().getClass())
+   *     => true, because the type variable T got type-erased and the anonymous ArrayList class
+   *     is instantiated with an unresolved type variable T.
+   * }</pre>
+   */
+  public boolean hasUnresolvedParameters() {
+    return hasUnresolvedParameters(getType());
+  }
+
   @Override
   public String toString() {
     return token.toString();

http://git-wip-us.apache.org/repos/asf/beam/blob/62c922b3/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
index a4626c9..8207f06 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
@@ -17,16 +17,20 @@
  */
 package org.apache.beam.sdk.values;
 
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.List;
 import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 
 /**
- * A utility class containing the Java primitives for
- * {@link TypeDescriptor} equivalents. Also, has methods
- * for classes that wrap Java primitives like {@link KV},
- * {@link Set}, {@link List}, and {@link Iterable}.
+ * A utility class for creating {@link TypeDescriptor} objects for different types, such as Java
+ * primitive types, containers and {@link KV KVs} of other {@link TypeDescriptor} objects, and
+ * extracting type variables of parameterized types (e.g. extracting the {@code OutputT} type
+ * variable of a {@code DoFn<InputT, OutputT>}).
  */
 public class TypeDescriptors {
   /**
@@ -286,4 +290,110 @@ public class TypeDescriptors {
 
     return typeDescriptor;
   }
+
+  /**
+   * A helper interface for use with {@link #extractFromTypeParameters(Object, Class,
+   * TypeVariableExtractor)}.
+   */
+  public interface TypeVariableExtractor<InputT, OutputT> {}
+
+  /**
+   * Extracts a type from the actual type parameters of a parameterized class, subject to Java type
+   * erasure. The type to extract is specified in a way that is safe w.r.t. changing the type
+   * signature of the parameterized class, as opposed to specifying the name or index of a type
+   * variable.
+   *
+   * <p>Example of use:
+   * <pre>{@code
+   *   class Foo<BarT> {
+   *     private SerializableFunction<BarT, String> fn;
+   *
+   *     TypeDescriptor<BarT> inferBarTypeDescriptorFromFn() {
+   *       return TypeDescriptors.extractFromTypeParameters(
+   *         fn,
+   *         SerializableFunction.class,
+   *         // The actual type of "fn" is matched against the input type of the extractor,
+   *         // and the obtained values of type variables of the superclass are substituted
+   *         // into the output type of the extractor.
+   *         new TypeVariableExtractor<SerializableFunction<BarT, String>, BarT>() {});
+   *     }
+   *   }
+   * }</pre>
+   *
+   * @param instance The object being analyzed
+   * @param supertype Parameterized superclass of interest
+   * @param extractor A class for specifying the type to extract from the supertype
+   *
+   * @return A {@link TypeDescriptor} for the actual value of the result type of the extractor,
+   *   or {@code null} if the type was erased.
+   */
+  @SuppressWarnings("unchecked")
+  @Nullable
+  public static <T, V> TypeDescriptor<V> extractFromTypeParameters(
+      T instance, Class<? super T> supertype, TypeVariableExtractor<T, V> extractor) {
+    return extractFromTypeParameters(
+        (TypeDescriptor<T>) TypeDescriptor.of(instance.getClass()), supertype, extractor);
+  }
+
+  /**
+   * Like {@link #extractFromTypeParameters(Object, Class, TypeVariableExtractor)}, but takes a
+   * {@link TypeDescriptor} of the instance being analyzed rather than the instance itself.
+   */
+  @SuppressWarnings("unchecked")
+  @Nullable
+  public static <T, V> TypeDescriptor<V> extractFromTypeParameters(
+      TypeDescriptor<T> type, Class<? super T> supertype, TypeVariableExtractor<T, V> extractor) {
+    // Get the type signature of the extractor, e.g.
+    // TypeVariableExtractor<SerializableFunction<BarT, String>, BarT>
+    TypeDescriptor<TypeVariableExtractor<T, V>> extractorSupertype =
+        (TypeDescriptor<TypeVariableExtractor<T, V>>)
+            TypeDescriptor.of(extractor.getClass()).getSupertype(TypeVariableExtractor.class);
+
+    // Get the actual type argument, e.g. SerializableFunction<BarT, String>
+    Type inputT = ((ParameterizedType) extractorSupertype.getType()).getActualTypeArguments()[0];
+
+    // Get the actual supertype of the type being analyzed, hopefully with all type parameters
+    // resolved, e.g. SerializableFunction<Integer, String>
+    TypeDescriptor supertypeDescriptor = type.getSupertype(supertype);
+
+    // Substitute actual supertype into the extractor, e.g.
+    // TypeVariableExtractor<SerializableFunction<Integer, String>, Integer>
+    TypeDescriptor<TypeVariableExtractor<T, V>> extractorT =
+        extractorSupertype.where(inputT, supertypeDescriptor.getType());
+
+    // Get output of the extractor.
+    Type outputT = ((ParameterizedType) extractorT.getType()).getActualTypeArguments()[1];
+    TypeDescriptor<?> res = TypeDescriptor.of(outputT);
+    if (res.hasUnresolvedParameters()) {
+      return null;
+    } else {
+      return (TypeDescriptor<V>) res;
+    }
+  }
+
+  /**
+   * Returns a type descriptor for the input of the given {@link SerializableFunction}, subject to
+   * Java type erasure: returns {@code null} if the type was erased.
+   */
+  @Nullable
+  public static <InputT, OutputT> TypeDescriptor<InputT> inputOf(
+      SerializableFunction<InputT, OutputT> fn) {
+    return extractFromTypeParameters(
+        fn,
+        SerializableFunction.class,
+        new TypeVariableExtractor<SerializableFunction<InputT, OutputT>, InputT>() {});
+  }
+
+  /**
+   * Returns a type descriptor for the output of the given {@link SerializableFunction}, subject to
+   * Java type erasure: returns {@code null} if the type was erased.
+   */
+  @Nullable
+  public static <InputT, OutputT> TypeDescriptor<OutputT> outputOf(
+      SerializableFunction<InputT, OutputT> fn) {
+    return extractFromTypeParameters(
+        fn,
+        SerializableFunction.class,
+        new TypeVariableExtractor<SerializableFunction<InputT, OutputT>, OutputT>() {});
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/62c922b3/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java
index 1bf0fc9..a4f58da 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java
@@ -25,6 +25,7 @@ import static org.apache.beam.sdk.values.TypeDescriptors.sets;
 import static org.apache.beam.sdk.values.TypeDescriptors.strings;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
 
 import java.util.List;
 import java.util.Set;
@@ -70,4 +71,52 @@ public class TypeDescriptorsTest {
     assertNotEquals(descriptor, new TypeDescriptor<List<String>>() {});
     assertNotEquals(descriptor, new TypeDescriptor<List<Boolean>>() {});
   }
+
+  private interface Generic<FooT, BarT> {}
+
+  private static <ActualFooT> Generic<ActualFooT, String> typeErasedGeneric() {
+    return new Generic<ActualFooT, String>() {};
+  }
+
+  private static <ActualFooT, ActualBarT> TypeDescriptor<ActualFooT> extractFooT(
+      Generic<ActualFooT, ActualBarT> instance) {
+    return TypeDescriptors.extractFromTypeParameters(
+        instance,
+        Generic.class,
+        new TypeDescriptors.TypeVariableExtractor<
+            Generic<ActualFooT, ActualBarT>, ActualFooT>() {});
+  }
+
+  private static <ActualFooT, ActualBarT> TypeDescriptor<ActualBarT> extractBarT(
+      Generic<ActualFooT, ActualBarT> instance) {
+    return TypeDescriptors.extractFromTypeParameters(
+        instance,
+        Generic.class,
+        new TypeDescriptors.TypeVariableExtractor<
+            Generic<ActualFooT, ActualBarT>, ActualBarT>() {});
+  }
+
+  private static <ActualFooT, ActualBarT> TypeDescriptor<KV<ActualFooT, ActualBarT>> extractKV(
+      Generic<ActualFooT, ActualBarT> instance) {
+    return TypeDescriptors.extractFromTypeParameters(
+        instance,
+        Generic.class,
+        new TypeDescriptors.TypeVariableExtractor<
+            Generic<ActualFooT, ActualBarT>, KV<ActualFooT, ActualBarT>>() {});
+  }
+
+  @Test
+  public void testTypeDescriptorsTypeParameterOf() throws Exception {
+    assertEquals(strings(), extractFooT(new Generic<String, Integer>() {}));
+    assertEquals(integers(), extractBarT(new Generic<String, Integer>() {}));
+    assertEquals(kvs(strings(), integers()), extractKV(new Generic<String, Integer>() {}));
+  }
+
+  @Test
+  public void testTypeDescriptorsTypeParameterOfErased() throws Exception {
+    Generic<Integer, String> instance = TypeDescriptorsTest.typeErasedGeneric();
+    assertNull(extractFooT(instance));
+    assertEquals(strings(), extractBarT(instance));
+    assertNull(extractKV(instance));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/62c922b3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
index c5c2462..ea4fc4e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
@@ -19,11 +19,11 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParameters;
 
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.collect.Lists;
 import java.io.Serializable;
-import java.lang.reflect.TypeVariable;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 
 /**
@@ -157,17 +158,16 @@ public abstract class DynamicDestinations<T, DestinationT> implements Serializab
       return destinationCoder;
     }
     // If dynamicDestinations doesn't provide a coder, try to find it in the coder registry.
-    // We must first use reflection to figure out what the type parameter is.
-    TypeDescriptor<?> superDescriptor =
-        TypeDescriptor.of(getClass()).getSupertype(DynamicDestinations.class);
-    if (!superDescriptor.getRawType().equals(DynamicDestinations.class)) {
-      throw new AssertionError(
-          "Couldn't find the DynamicDestinations superclass of " + this.getClass());
-    }
-    TypeVariable typeVariable = superDescriptor.getTypeParameter("DestinationT");
-    @SuppressWarnings("unchecked")
     TypeDescriptor<DestinationT> descriptor =
-        (TypeDescriptor<DestinationT>) superDescriptor.resolveType(typeVariable);
+        extractFromTypeParameters(
+            this,
+            DynamicDestinations.class,
+            new TypeDescriptors.TypeVariableExtractor<
+                DynamicDestinations<T, DestinationT>, DestinationT>() {});
+    checkArgument(
+        descriptor != null,
+        "Unable to infer a coder for DestinationT, "
+            + "please specify it explicitly by overriding getDestinationCoder()");
     return registry.getCoder(descriptor);
   }
 }


[4/4] beam git commit: This closes #3632: [BEAM-2677] AvroIO.parseGenericRecords - schemaless AvroIO.read

Posted by jk...@apache.org.
This closes #3632: [BEAM-2677] AvroIO.parseGenericRecords - schemaless AvroIO.read


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a94d680e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a94d680e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a94d680e

Branch: refs/heads/master
Commit: a94d680eabd47a69be628028799ac4d41bd110d5
Parents: 7ab8954 c16947e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jul 28 10:25:35 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jul 28 10:25:35 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 199 ++++++++++++++++++-
 .../java/org/apache/beam/sdk/io/AvroSource.java | 166 ++++++++++++----
 .../org/apache/beam/sdk/io/FileBasedSink.java   |  24 +--
 .../apache/beam/sdk/values/TypeDescriptor.java  |  64 ++++--
 .../apache/beam/sdk/values/TypeDescriptors.java | 118 ++++++++++-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  89 ++++++---
 .../org/apache/beam/sdk/io/AvroSourceTest.java  |  30 ++-
 .../beam/sdk/values/TypeDescriptorsTest.java    |  49 +++++
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java |   4 +-
 .../io/gcp/bigquery/DynamicDestinations.java    |  22 +-
 .../sdk/io/gcp/bigquery/TransformingSource.java | 136 -------------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  68 -------
 12 files changed, 641 insertions(+), 328 deletions(-)
----------------------------------------------------------------------



[2/4] beam git commit: [BEAM-2677] AvroIO.parseGenericRecords - schemaless AvroIO.read

Posted by jk...@apache.org.
[BEAM-2677] AvroIO.parseGenericRecords - schemaless AvroIO.read


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ebd00411
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ebd00411
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ebd00411

Branch: refs/heads/master
Commit: ebd004119c387787d0e0fcd0487e1b2754c7dbc5
Parents: 62c922b
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Jul 24 15:07:15 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jul 28 10:25:07 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 199 ++++++++++++++++++-
 .../java/org/apache/beam/sdk/io/AvroSource.java | 166 ++++++++++++----
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  89 ++++++---
 .../org/apache/beam/sdk/io/AvroSourceTest.java  |  30 ++-
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java |   4 +-
 5 files changed, 406 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 018b84f..27c9073 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -35,7 +35,9 @@ import org.apache.avro.reflect.ReflectData;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
@@ -53,13 +55,16 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
 
 /**
  * {@link PTransform}s for reading and writing Avro files.
  *
- * <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()}, using
- * {@link AvroIO.Read#from} to specify the filename or filepattern to read from. Alternatively, if
- * the filepatterns to be read are themselves in a {@link PCollection}, apply {@link #readAll}.
+ * <p>To read a {@link PCollection} from one or more Avro files with the same schema known at
+ * pipeline construction time, use {@code AvroIO.read()}, using {@link AvroIO.Read#from} to specify
+ * the filename or filepattern to read from. Alternatively, if the filepatterns to be read are
+ * themselves in a {@link PCollection}, apply {@link #readAll}.
  *
  * <p>See {@link FileSystems} for information on supported file systems and filepatterns.
  *
@@ -70,6 +75,12 @@ import org.apache.beam.sdk.values.PDone;
  * schema. Likewise, to read a {@link PCollection} of filepatterns, apply {@link
  * #readAllGenericRecords}.
  *
+ * <p>To read records from files whose schema is unknown at pipeline construction time or differs
+ * between files, use {@link #parseGenericRecords} - in this case, you will need to specify a
+ * parsing function for converting each {@link GenericRecord} into a value of your custom type.
+ * Likewise, to read a {@link PCollection} of filepatterns with unknown schema, use {@link
+ * #parseAllGenericRecords}.
+ *
  * <p>For example:
  *
  * <pre>{@code
@@ -84,12 +95,20 @@ import org.apache.beam.sdk.values.PDone;
  * PCollection<GenericRecord> records =
  *     p.apply(AvroIO.readGenericRecords(schema)
  *                .from("gs://my_bucket/path/to/records-*.avro"));
+ *
+ * PCollection<Foo> records =
+ *     p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() {
+ *       public Foo apply(GenericRecord record) {
+ *         // If needed, access the schema of the record using record.getSchema()
+ *         return ...;
+ *       }
+ *     }));
  * }</pre>
  *
  * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
- * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
- * scalability. Note that it may decrease performance if the filepattern matches only a small
- * number of files.
+ * thousands or more), use {@link Read#withHintMatchesManyFiles} or {@link
+ * Parse#withHintMatchesManyFiles} for better performance and scalability. Note that it may decrease
+ * performance if the filepattern matches only a small number of files.
  *
  * <p>Reading from a {@link PCollection} of filepatterns:
  *
@@ -101,6 +120,8 @@ import org.apache.beam.sdk.values.PDone;
  *     filepatterns.apply(AvroIO.read(AvroAutoGenClass.class));
  * PCollection<GenericRecord> genericRecords =
  *     filepatterns.apply(AvroIO.readGenericRecords(schema));
+ * PCollection<Foo> records =
+ *     filepatterns.apply(AvroIO.parseAllGenericRecords(new SerializableFunction...);
  * }</pre>
  *
  * <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, using
@@ -208,6 +229,29 @@ public class AvroIO {
   }
 
   /**
+   * Reads Avro file(s) containing records of an unspecified schema and converting each record to a
+   * custom type.
+   */
+  public static <T> Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_Parse.Builder<T>()
+        .setParseFn(parseFn)
+        .setHintMatchesManyFiles(false)
+        .build();
+  }
+
+  /**
+   * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each filepattern in the
+   * input {@link PCollection}.
+   */
+  public static <T> ParseAll<T> parseAllGenericRecords(
+      SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_ParseAll.Builder<T>()
+        .setParseFn(parseFn)
+        .setDesiredBundleSizeBytes(64 * 1024 * 1024L)
+        .build();
+  }
+
+  /**
    * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding
    * pattern).
    */
@@ -387,6 +431,149 @@ public class AvroIO {
 
   /////////////////////////////////////////////////////////////////////////////
 
+  /** Implementation of {@link #parseGenericRecords}. */
+  @AutoValue
+  public abstract static class Parse<T> extends PTransform<PBegin, PCollection<T>> {
+    @Nullable abstract ValueProvider<String> getFilepattern();
+    abstract SerializableFunction<GenericRecord, T> getParseFn();
+    @Nullable abstract Coder<T> getCoder();
+    abstract boolean getHintMatchesManyFiles();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
+      abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
+      abstract Builder<T> setCoder(Coder<T> coder);
+      abstract Builder<T> setHintMatchesManyFiles(boolean hintMatchesManyFiles);
+
+      abstract Parse<T> build();
+    }
+
+    /** Reads from the given filename or filepattern. */
+    public Parse<T> from(String filepattern) {
+      return from(StaticValueProvider.of(filepattern));
+    }
+
+    /** Like {@link #from(String)}. */
+    public Parse<T> from(ValueProvider<String> filepattern) {
+      return toBuilder().setFilepattern(filepattern).build();
+    }
+
+    /** Sets a coder for the result of the parse function. */
+    public Parse<T> withCoder(Coder<T> coder) {
+      return toBuilder().setCoder(coder).build();
+    }
+
+    /** Like {@link Read#withHintMatchesManyFiles()}. */
+    public Parse<T> withHintMatchesManyFiles() {
+      return toBuilder().setHintMatchesManyFiles(true).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      checkNotNull(getFilepattern(), "filepattern");
+      Coder<T> coder = inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
+      if (getHintMatchesManyFiles()) {
+        return input
+            .apply(Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+            .apply(parseAllGenericRecords(getParseFn()).withCoder(getCoder()));
+      }
+      return input.apply(
+          org.apache.beam.sdk.io.Read.from(
+              AvroSource.from(getFilepattern()).withParseFn(getParseFn(), coder)));
+    }
+
+    private static <T> Coder<T> inferCoder(
+        @Nullable Coder<T> explicitCoder,
+        SerializableFunction<GenericRecord, T> parseFn,
+        CoderRegistry coderRegistry) {
+      if (explicitCoder != null) {
+        return explicitCoder;
+      }
+      // If a coder was not specified explicitly, infer it from parse fn.
+      TypeDescriptor<T> descriptor = TypeDescriptors.outputOf(parseFn);
+      String message =
+          "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().";
+      checkArgument(descriptor != null, message);
+      try {
+        return coderRegistry.getCoder(descriptor);
+      } catch (CannotProvideCoderException e) {
+        throw new IllegalArgumentException(message, e);
+      }
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(
+              DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
+          .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"));
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Implementation of {@link #parseAllGenericRecords}. */
+  @AutoValue
+  public abstract static class ParseAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
+    abstract SerializableFunction<GenericRecord, T> getParseFn();
+    @Nullable abstract Coder<T> getCoder();
+    abstract long getDesiredBundleSizeBytes();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
+      abstract Builder<T> setCoder(Coder<T> coder);
+      abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
+
+      abstract ParseAll<T> build();
+    }
+
+    /** Specifies the coder for the result of the {@code parseFn}. */
+    public ParseAll<T> withCoder(Coder<T> coder) {
+      return toBuilder().setCoder(coder).build();
+    }
+
+    @VisibleForTesting
+    ParseAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+      return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<String> input) {
+      final Coder<T> coder =
+          Parse.inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
+      SerializableFunction<String, FileBasedSource<T>> createSource =
+          new SerializableFunction<String, FileBasedSource<T>>() {
+            @Override
+            public FileBasedSource<T> apply(String input) {
+              return AvroSource.from(input).withParseFn(getParseFn(), coder);
+            }
+          };
+      return input
+          .apply(
+              "Parse all via FileBasedSource",
+              new ReadAllViaFileBasedSource<>(
+                  SerializableFunctions.<String, Boolean>constant(true) /* isSplittable */,
+                  getDesiredBundleSizeBytes(),
+                  createSource))
+          .setCoder(coder);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"));
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
   /** Implementation of {@link #write}. */
   @AutoValue
   public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {

http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index a98d870..d277503 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
@@ -27,8 +28,10 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InvalidObjectException;
+import java.io.ObjectInputStream;
 import java.io.ObjectStreamException;
 import java.io.PushbackInputStream;
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
@@ -53,10 +56,12 @@ import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream;
@@ -130,19 +135,84 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   // The default sync interval is 64k.
   private static final long DEFAULT_MIN_BUNDLE_SIZE = 2 * DataFileConstants.DEFAULT_SYNC_INTERVAL;
 
-  // The type of the records contained in the file.
-  private final Class<T> type;
+  // Use cases of AvroSource are:
+  // 1) AvroSource<GenericRecord> Reading GenericRecord records with a specified schema.
+  // 2) AvroSource<Foo> Reading records of a generated Avro class Foo.
+  // 3) AvroSource<T> Reading GenericRecord records with an unspecified schema
+  //    and converting them to type T.
+  //                     |    Case 1     |    Case 2   |     Case 3    |
+  // type                | GenericRecord |     Foo     | GenericRecord |
+  // readerSchemaString  |    non-null   |   non-null  |     null      |
+  // parseFn             |      null     |     null    |   non-null    |
+  // outputCoder         |      null     |     null    |   non-null    |
+  private static class Mode<T> implements Serializable {
+    private final Class<?> type;
+
+    // The JSON schema used to decode records.
+    @Nullable
+    private String readerSchemaString;
+
+    @Nullable
+    private final SerializableFunction<GenericRecord, T> parseFn;
+
+    @Nullable
+    private final Coder<T> outputCoder;
+
+    private Mode(
+        Class<?> type,
+        @Nullable String readerSchemaString,
+        @Nullable SerializableFunction<GenericRecord, T> parseFn,
+        @Nullable Coder<T> outputCoder) {
+      this.type = type;
+      this.readerSchemaString = internSchemaString(readerSchemaString);
+      this.parseFn = parseFn;
+      this.outputCoder = outputCoder;
+    }
 
-  // The JSON schema used to decode records.
-  @Nullable
-  private final String readerSchemaString;
+    private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException {
+      is.defaultReadObject();
+      readerSchemaString = internSchemaString(readerSchemaString);
+    }
+
+    private Coder<T> getOutputCoder() {
+      if (parseFn == null) {
+        return AvroCoder.of((Class<T>) type, internOrParseSchemaString(readerSchemaString));
+      } else {
+        return outputCoder;
+      }
+    }
+
+    private void validate() {
+      if (parseFn == null) {
+        checkArgument(
+            readerSchemaString != null,
+            "schema must be specified using withSchema() when not using a parse fn");
+      }
+    }
+  }
+
+  private static Mode<GenericRecord> readGenericRecordsWithSchema(String schema) {
+    return new Mode<>(GenericRecord.class, schema, null, null);
+  }
+  private static <T> Mode<T> readGeneratedClasses(Class<T> clazz) {
+    return new Mode<>(clazz, ReflectData.get().getSchema(clazz).toString(), null, null);
+  }
+  private static <T> Mode<T> parseGenericRecords(
+      SerializableFunction<GenericRecord, T> parseFn, Coder<T> outputCoder) {
+    return new Mode<>(GenericRecord.class, null, parseFn, outputCoder);
+  }
+
+  private final Mode<T> mode;
 
   /**
-   * Reads from the given file name or pattern ("glob"). The returned source can be further
+   * Reads from the given file name or pattern ("glob"). The returned source needs to be further
    * configured by calling {@link #withSchema} to return a type other than {@link GenericRecord}.
    */
   public static AvroSource<GenericRecord> from(ValueProvider<String> fileNameOrPattern) {
-    return new AvroSource<>(fileNameOrPattern, DEFAULT_MIN_BUNDLE_SIZE, null, GenericRecord.class);
+    return new AvroSource<>(
+        fileNameOrPattern,
+        DEFAULT_MIN_BUNDLE_SIZE,
+        readGenericRecordsWithSchema(null /* will need to be specified in withSchema */));
   }
 
   /** Like {@link #from(ValueProvider)}. */
@@ -152,23 +222,40 @@ public class AvroSource<T> extends BlockBasedSource<T> {
 
   /** Reads files containing records that conform to the given schema. */
   public AvroSource<GenericRecord> withSchema(String schema) {
+    checkNotNull(schema, "schema");
     return new AvroSource<>(
-        getFileOrPatternSpecProvider(), getMinBundleSize(), schema, GenericRecord.class);
+        getFileOrPatternSpecProvider(),
+        getMinBundleSize(),
+        readGenericRecordsWithSchema(schema));
   }
 
   /** Like {@link #withSchema(String)}. */
   public AvroSource<GenericRecord> withSchema(Schema schema) {
-    return new AvroSource<>(
-        getFileOrPatternSpecProvider(), getMinBundleSize(), schema.toString(), GenericRecord.class);
+    checkNotNull(schema, "schema");
+    return withSchema(schema.toString());
   }
 
   /** Reads files containing records of the given class. */
   public <X> AvroSource<X> withSchema(Class<X> clazz) {
+    checkNotNull(clazz, "clazz");
+    return new AvroSource<>(
+        getFileOrPatternSpecProvider(),
+        getMinBundleSize(),
+        readGeneratedClasses(clazz));
+  }
+
+  /**
+   * Reads {@link GenericRecord} of unspecified schema and maps them to instances of a custom type
+   * using the given {@code parseFn} and encoded using the given coder.
+   */
+  public <X> AvroSource<X> withParseFn(
+      SerializableFunction<GenericRecord, X> parseFn, Coder<X> coder) {
+    checkNotNull(parseFn, "parseFn");
+    checkNotNull(parseFn, "coder");
     return new AvroSource<>(
         getFileOrPatternSpecProvider(),
         getMinBundleSize(),
-        ReflectData.get().getSchema(clazz).toString(),
-        clazz);
+        parseGenericRecords(parseFn, coder));
   }
 
   /**
@@ -176,19 +263,16 @@ public class AvroSource<T> extends BlockBasedSource<T> {
    * minBundleSize} and its use.
    */
   public AvroSource<T> withMinBundleSize(long minBundleSize) {
-    return new AvroSource<>(
-        getFileOrPatternSpecProvider(), minBundleSize, readerSchemaString, type);
+    return new AvroSource<>(getFileOrPatternSpecProvider(), minBundleSize, mode);
   }
 
   /** Constructor for FILEPATTERN mode. */
   private AvroSource(
       ValueProvider<String> fileNameOrPattern,
       long minBundleSize,
-      String readerSchemaString,
-      Class<T> type) {
+      Mode<T> mode) {
     super(fileNameOrPattern, minBundleSize);
-    this.readerSchemaString = internSchemaString(readerSchemaString);
-    this.type = type;
+    this.mode = mode;
   }
 
   /** Constructor for SINGLE_FILE_OR_SUBRANGE mode. */
@@ -197,18 +281,15 @@ public class AvroSource<T> extends BlockBasedSource<T> {
       long minBundleSize,
       long startOffset,
       long endOffset,
-      String readerSchemaString,
-      Class<T> type) {
+      Mode<T> mode) {
     super(metadata, minBundleSize, startOffset, endOffset);
-    this.readerSchemaString = internSchemaString(readerSchemaString);
-    this.type = type;
+    this.mode = mode;
   }
 
   @Override
   public void validate() {
-    // AvroSource objects do not need to be configured with more than a file pattern. Overridden to
-    // make this explicit.
     super.validate();
+    mode.validate();
   }
 
   /**
@@ -225,7 +306,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
 
   @Override
   public BlockBasedSource<T> createForSubrangeOfFile(Metadata fileMetadata, long start, long end) {
-    return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, readerSchemaString, type);
+    return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, mode);
   }
 
   @Override
@@ -234,14 +315,14 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   }
 
   @Override
-  public AvroCoder<T> getDefaultOutputCoder() {
-    return AvroCoder.of(type, internOrParseSchemaString(readerSchemaString));
+  public Coder<T> getDefaultOutputCoder() {
+    return mode.getOutputCoder();
   }
 
   @VisibleForTesting
   @Nullable
   String getReaderSchemaString() {
-    return readerSchemaString;
+    return mode.readerSchemaString;
   }
 
   /** Avro file metadata. */
@@ -380,15 +461,9 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     switch (getMode()) {
       case SINGLE_FILE_OR_SUBRANGE:
         return new AvroSource<>(
-            getSingleFileMetadata(),
-            getMinBundleSize(),
-            getStartOffset(),
-            getEndOffset(),
-            readerSchemaString,
-            type);
+            getSingleFileMetadata(), getMinBundleSize(), getStartOffset(), getEndOffset(), mode);
       case FILEPATTERN:
-        return new AvroSource<>(
-            getFileOrPatternSpecProvider(), getMinBundleSize(), readerSchemaString, type);
+        return new AvroSource<>(getFileOrPatternSpecProvider(), getMinBundleSize(), mode);
         default:
           throw new InvalidObjectException(
               String.format("Unknown mode %s for AvroSource %s", getMode(), this));
@@ -402,6 +477,8 @@ public class AvroSource<T> extends BlockBasedSource<T> {
    */
   @Experimental(Experimental.Kind.SOURCE_SINK)
   static class AvroBlock<T> extends Block<T> {
+    private final Mode<T> mode;
+
     // The number of records in the block.
     private final long numRecords;
 
@@ -412,7 +489,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     private long currentRecordIndex = 0;
 
     // A DatumReader to read records from the block.
-    private final DatumReader<T> reader;
+    private final DatumReader<?> reader;
 
     // A BinaryDecoder used by the reader to decode records.
     private final BinaryDecoder decoder;
@@ -455,19 +532,19 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     AvroBlock(
         byte[] data,
         long numRecords,
-        Class<? extends T> type,
-        String readerSchemaString,
+        Mode<T> mode,
         String writerSchemaString,
         String codec)
         throws IOException {
+      this.mode = mode;
       this.numRecords = numRecords;
       checkNotNull(writerSchemaString, "writerSchemaString");
       Schema writerSchema = internOrParseSchemaString(writerSchemaString);
       Schema readerSchema =
           internOrParseSchemaString(
-              MoreObjects.firstNonNull(readerSchemaString, writerSchemaString));
+              MoreObjects.firstNonNull(mode.readerSchemaString, writerSchemaString));
       this.reader =
-          (type == GenericRecord.class)
+          (mode.type == GenericRecord.class)
               ? new GenericDatumReader<T>(writerSchema, readerSchema)
               : new ReflectDatumReader<T>(writerSchema, readerSchema);
       this.decoder = DecoderFactory.get().binaryDecoder(decodeAsInputStream(data, codec), null);
@@ -483,7 +560,9 @@ public class AvroSource<T> extends BlockBasedSource<T> {
       if (currentRecordIndex >= numRecords) {
         return false;
       }
-      currentRecord = reader.read(null, decoder);
+      Object record = reader.read(null, decoder);
+      currentRecord =
+          (mode.parseFn == null) ? ((T) record) : mode.parseFn.apply((GenericRecord) record);
       currentRecordIndex++;
       return true;
     }
@@ -585,8 +664,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
           new AvroBlock<>(
               data,
               numRecords,
-              getCurrentSource().type,
-              getCurrentSource().readerSchemaString,
+              getCurrentSource().mode,
               metadata.getSchemaString(),
               metadata.getCodec());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 90cd824..154ff5a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -67,6 +67,7 @@ import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -114,9 +115,9 @@ public class AvroIOTest {
 
     public GenericClass() {}
 
-    public GenericClass(int intValue, String stringValue) {
-      this.intField = intValue;
-      this.stringField = stringValue;
+    public GenericClass(int intField, String stringField) {
+      this.intField = intField;
+      this.stringField = stringField;
     }
 
     @Override
@@ -142,9 +143,18 @@ public class AvroIOTest {
     }
   }
 
+  private static class ParseGenericClass
+      implements SerializableFunction<GenericRecord, GenericClass> {
+    @Override
+    public GenericClass apply(GenericRecord input) {
+      return new GenericClass(
+          (int) input.get("intField"), input.get("stringField").toString());
+    }
+  }
+
   @Test
   @Category(NeedsRunner.class)
-  public void testAvroIOWriteAndReadASingleFile() throws Throwable {
+  public void testAvroIOWriteAndReadAndParseASingleFile() throws Throwable {
     List<GenericClass> values =
         ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
@@ -153,23 +163,45 @@ public class AvroIOTest {
         .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
     writePipeline.run().waitUntilFinish();
 
-    // Test the same data via read(), read().withHintMatchesManyFiles(), and readAll()
+    // Test the same data using all versions of read().
+    PCollection<String> path =
+        readPipeline.apply("Create path", Create.of(outputFile.getAbsolutePath()));
     PAssert.that(
-            readPipeline.apply(
-                "Read", AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
+        readPipeline.apply(
+            "Read", AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
         .containsInAnyOrder(values);
     PAssert.that(
-            readPipeline.apply(
-                "Read withHintMatchesManyFiles",
-                AvroIO.read(GenericClass.class)
-                    .from(outputFile.getAbsolutePath())
-                    .withHintMatchesManyFiles()))
+        readPipeline.apply(
+            "Read withHintMatchesManyFiles",
+            AvroIO.read(GenericClass.class)
+                .from(outputFile.getAbsolutePath())
+                .withHintMatchesManyFiles()))
         .containsInAnyOrder(values);
     PAssert.that(
-            "ReadAll",
-            readPipeline
-                .apply(Create.of(outputFile.getAbsolutePath()))
-                .apply(AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+        path.apply(
+            "ReadAll", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+        .containsInAnyOrder(values);
+    PAssert.that(
+        readPipeline.apply(
+            "Parse",
+            AvroIO.parseGenericRecords(new ParseGenericClass())
+                .from(outputFile.getAbsolutePath())
+                .withCoder(AvroCoder.of(GenericClass.class))))
+        .containsInAnyOrder(values);
+    PAssert.that(
+        readPipeline.apply(
+            "Parse withHintMatchesManyFiles",
+            AvroIO.parseGenericRecords(new ParseGenericClass())
+                .from(outputFile.getAbsolutePath())
+                .withCoder(AvroCoder.of(GenericClass.class))
+                .withHintMatchesManyFiles()))
+        .containsInAnyOrder(values);
+    PAssert.that(
+        path.apply(
+            "ParseAll",
+            AvroIO.parseAllGenericRecords(new ParseGenericClass())
+                .withCoder(AvroCoder.of(GenericClass.class))
+                .withDesiredBundleSizeBytes(10)))
         .containsInAnyOrder(values);
 
     readPipeline.run();
@@ -200,7 +232,7 @@ public class AvroIOTest {
                 .withNumShards(3));
     writePipeline.run().waitUntilFinish();
 
-    // Test both read() and readAll()
+    // Test read(), readAll(), and parseAllGenericRecords().
     PAssert.that(
             readPipeline.apply(
                 "Read first",
@@ -213,15 +245,22 @@ public class AvroIOTest {
                 AvroIO.read(GenericClass.class)
                     .from(tmpFolder.getRoot().getAbsolutePath() + "/second*")))
         .containsInAnyOrder(secondValues);
+    PCollection<String> paths =
+        readPipeline.apply(
+            "Create paths",
+            Create.of(
+                tmpFolder.getRoot().getAbsolutePath() + "/first*",
+                tmpFolder.getRoot().getAbsolutePath() + "/second*"));
+    PAssert.that(
+            paths.apply(
+                "Read all", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+        .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
     PAssert.that(
-            readPipeline
-                .apply(
-                    "Create paths",
-                    Create.of(
-                        tmpFolder.getRoot().getAbsolutePath() + "/first*",
-                        tmpFolder.getRoot().getAbsolutePath() + "/second*"))
-                .apply(
-                    "Read all", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+            paths.apply(
+                "Parse all",
+                AvroIO.parseAllGenericRecords(new ParseGenericClass())
+                    .withCoder(AvroCoder.of(GenericClass.class))
+                    .withDesiredBundleSizeBytes(10)))
         .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
 
     readPipeline.run();

http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index bf2ac95..714e029 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -59,6 +59,7 @@ import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.hamcrest.Matchers;
@@ -407,11 +408,6 @@ public class AvroSourceTest {
     source = AvroSource.from(filename).withSchema(schemaString);
     records = SourceTestUtils.readFromSource(source, null);
     assertEqualsWithGeneric(expected, records);
-
-    // Create a source with no schema
-    source = AvroSource.from(filename);
-    records = SourceTestUtils.readFromSource(source, null);
-    assertEqualsWithGeneric(expected, records);
   }
 
   @Test
@@ -449,6 +445,30 @@ public class AvroSourceTest {
     assertSame(sourceA.getReaderSchemaString(), sourceC.getReaderSchemaString());
   }
 
+  @Test
+  public void testParseFn() throws Exception {
+    List<Bird> expected = createRandomRecords(100);
+    String filename = generateTestFile("tmp.avro", expected, SyncBehavior.SYNC_DEFAULT, 0,
+        AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC);
+
+    AvroSource<Bird> source =
+        AvroSource.from(filename)
+            .withParseFn(
+                new SerializableFunction<GenericRecord, Bird>() {
+                  @Override
+                  public Bird apply(GenericRecord input) {
+                    return new Bird(
+                        (long) input.get("number"),
+                        input.get("species").toString(),
+                        input.get("quality").toString(),
+                        (long) input.get("quantity"));
+                  }
+                },
+                AvroCoder.of(Bird.class));
+    List<Bird> actual = SourceTestUtils.readFromSource(source, null);
+    assertThat(actual, containsInAnyOrder(expected.toArray()));
+  }
+
   private void assertEqualsWithGeneric(List<Bird> expected, List<GenericRecord> actual) {
     assertEquals(expected.size(), actual.size());
     for (int i = 0; i < expected.size(); i++) {

http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 2b1eafe..6c118a0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -183,8 +183,8 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
 
     List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
     for (ResourceId file : files) {
-      avroSources.add(new TransformingSource<>(
-          AvroSource.from(file.toString()), function, getDefaultOutputCoder()));
+      avroSources.add(
+          AvroSource.from(file.toString()).withParseFn(function, getDefaultOutputCoder()));
     }
     return ImmutableList.copyOf(avroSources);
   }


[3/4] beam git commit: Removes TransformingSource that is now unused

Posted by jk...@apache.org.
Removes TransformingSource that is now unused


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c16947ec
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c16947ec
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c16947ec

Branch: refs/heads/master
Commit: c16947ec01d21ef99a5e2024d7aaead3c7a4399f
Parents: ebd0041
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jul 25 23:35:00 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jul 28 10:25:07 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/bigquery/TransformingSource.java | 136 -------------------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  68 ----------
 2 files changed, 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c16947ec/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
deleted file mode 100644
index b8e6b39..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.util.List;
-import java.util.NoSuchElementException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.joda.time.Instant;
-
-/**
- * A {@link BoundedSource} that reads from {@code BoundedSource<T>}
- * and transforms elements to type {@code V}.
-*/
-@VisibleForTesting
-class TransformingSource<T, V> extends BoundedSource<V> {
-  private final BoundedSource<T> boundedSource;
-  private final SerializableFunction<T, V> function;
-  private final Coder<V> outputCoder;
-
-  TransformingSource(
-      BoundedSource<T> boundedSource,
-      SerializableFunction<T, V> function,
-      Coder<V> outputCoder) {
-    this.boundedSource = checkNotNull(boundedSource, "boundedSource");
-    this.function = checkNotNull(function, "function");
-    this.outputCoder = checkNotNull(outputCoder, "outputCoder");
-  }
-
-  @Override
-  public List<? extends BoundedSource<V>> split(
-      long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-    return Lists.transform(
-        boundedSource.split(desiredBundleSizeBytes, options),
-        new Function<BoundedSource<T>, BoundedSource<V>>() {
-          @Override
-          public BoundedSource<V> apply(BoundedSource<T> input) {
-            return new TransformingSource<>(input, function, outputCoder);
-          }
-        });
-  }
-
-  @Override
-  public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-    return boundedSource.getEstimatedSizeBytes(options);
-  }
-
-  @Override
-  public BoundedReader<V> createReader(PipelineOptions options) throws IOException {
-    return new TransformingReader(boundedSource.createReader(options));
-  }
-
-  @Override
-  public void validate() {
-    boundedSource.validate();
-  }
-
-  @Override
-  public Coder<V> getDefaultOutputCoder() {
-    return outputCoder;
-  }
-
-  private class TransformingReader extends BoundedReader<V> {
-    private final BoundedReader<T> boundedReader;
-
-    private TransformingReader(BoundedReader<T> boundedReader) {
-      this.boundedReader = checkNotNull(boundedReader, "boundedReader");
-    }
-
-    @Override
-    public synchronized BoundedSource<V> getCurrentSource() {
-      return new TransformingSource<>(boundedReader.getCurrentSource(), function, outputCoder);
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      return boundedReader.start();
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      return boundedReader.advance();
-    }
-
-    @Override
-    public V getCurrent() throws NoSuchElementException {
-      T current = boundedReader.getCurrent();
-      return function.apply(current);
-    }
-
-    @Override
-    public void close() throws IOException {
-      boundedReader.close();
-    }
-
-    @Override
-    public synchronized BoundedSource<V> splitAtFraction(double fraction) {
-      BoundedSource<T> split = boundedReader.splitAtFraction(fraction);
-      return split == null ? null : new TransformingSource<>(split, function, outputCoder);
-    }
-
-    @Override
-    public Double getFractionConsumed() {
-      return boundedReader.getFractionConsumed();
-    }
-
-    @Override
-    public Instant getCurrentTimestamp() throws NoSuchElementException {
-      return boundedReader.getCurrentTimestamp();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c16947ec/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 3465b4e..8db4e94 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -86,7 +86,6 @@ import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.fs.ResourceId;
@@ -1510,8 +1509,6 @@ public class BigQueryIOTest implements Serializable {
     // Simulate a repeated call to split(), like a Dataflow worker will sometimes do.
     sources = bqSource.split(200, options);
     assertEquals(2, sources.size());
-    BoundedSource<TableRow> actual = sources.get(0);
-    assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
 
     // A repeated call to split() should not have caused a duplicate extract job.
     assertEquals(1, fakeJobService.getNumExtractJobCalls());
@@ -1594,8 +1591,6 @@ public class BigQueryIOTest implements Serializable {
 
     List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
     assertEquals(2, sources.size());
-    BoundedSource<TableRow> actual = sources.get(0);
-    assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
   }
 
   @Test
@@ -1673,69 +1668,6 @@ public class BigQueryIOTest implements Serializable {
 
     List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
     assertEquals(2, sources.size());
-    BoundedSource<TableRow> actual = sources.get(0);
-    assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
-  }
-
-  @Test
-  public void testTransformingSource() throws Exception {
-    int numElements = 10000;
-    @SuppressWarnings("deprecation")
-    BoundedSource<Long> longSource = CountingSource.upTo(numElements);
-    SerializableFunction<Long, String> toStringFn =
-        new SerializableFunction<Long, String>() {
-          @Override
-          public String apply(Long input) {
-            return input.toString();
-         }};
-    BoundedSource<String> stringSource = new TransformingSource<>(
-        longSource, toStringFn, StringUtf8Coder.of());
-
-    List<String> expected = Lists.newArrayList();
-    for (int i = 0; i < numElements; i++) {
-      expected.add(String.valueOf(i));
-    }
-
-    PipelineOptions options = PipelineOptionsFactory.create();
-    Assert.assertThat(
-        SourceTestUtils.readFromSource(stringSource, options),
-        CoreMatchers.is(expected));
-    SourceTestUtils.assertSplitAtFractionBehavior(
-        stringSource, 100, 0.3, ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT, options);
-
-    SourceTestUtils.assertSourcesEqualReferenceSource(
-        stringSource, stringSource.split(100, options), options);
-  }
-
-  @Test
-  public void testTransformingSourceUnsplittable() throws Exception {
-    int numElements = 10000;
-    @SuppressWarnings("deprecation")
-    BoundedSource<Long> longSource =
-        SourceTestUtils.toUnsplittableSource(CountingSource.upTo(numElements));
-    SerializableFunction<Long, String> toStringFn =
-        new SerializableFunction<Long, String>() {
-          @Override
-          public String apply(Long input) {
-            return input.toString();
-          }
-        };
-    BoundedSource<String> stringSource =
-        new TransformingSource<>(longSource, toStringFn, StringUtf8Coder.of());
-
-    List<String> expected = Lists.newArrayList();
-    for (int i = 0; i < numElements; i++) {
-      expected.add(String.valueOf(i));
-    }
-
-    PipelineOptions options = PipelineOptionsFactory.create();
-    Assert.assertThat(
-        SourceTestUtils.readFromSource(stringSource, options), CoreMatchers.is(expected));
-    SourceTestUtils.assertSplitAtFractionBehavior(
-        stringSource, 100, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
-
-    SourceTestUtils.assertSourcesEqualReferenceSource(
-        stringSource, stringSource.split(100, options), options);
   }
 
   @Test