You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/28 17:53:05 UTC
[1/4] beam git commit: Introduces
TypeDescriptors.extractFromTypeParameters
Repository: beam
Updated Branches:
refs/heads/master 7ab8954b9 -> a94d680ea
Introduces TypeDescriptors.extractFromTypeParameters
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62c922b3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62c922b3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62c922b3
Branch: refs/heads/master
Commit: 62c922b3c2dc3163b180f972a7449cf5e6ac501a
Parents: 7ab8954
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jul 25 17:18:16 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jul 28 10:23:15 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/FileBasedSink.java | 24 ++--
.../apache/beam/sdk/values/TypeDescriptor.java | 64 +++++++---
.../apache/beam/sdk/values/TypeDescriptors.java | 118 ++++++++++++++++++-
.../beam/sdk/values/TypeDescriptorsTest.java | 49 ++++++++
.../io/gcp/bigquery/DynamicDestinations.java | 22 ++--
5 files changed, 235 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/62c922b3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 9953975..3bf5d5b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verifyNotNull;
import static org.apache.beam.sdk.io.WriteFiles.UNKNOWN_SHARDNUM;
+import static org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParameters;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
@@ -33,7 +34,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
-import java.lang.reflect.TypeVariable;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
@@ -75,6 +75,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
import org.joda.time.Instant;
@@ -255,17 +256,16 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
return destinationCoder;
}
// If dynamicDestinations doesn't provide a coder, try to find it in the coder registry.
- // We must first use reflection to figure out what the type parameter is.
- TypeDescriptor<?> superDescriptor =
- TypeDescriptor.of(getClass()).getSupertype(DynamicDestinations.class);
- if (!superDescriptor.getRawType().equals(DynamicDestinations.class)) {
- throw new AssertionError(
- "Couldn't find the DynamicDestinations superclass of " + this.getClass());
- }
- TypeVariable typeVariable = superDescriptor.getTypeParameter("DestinationT");
- @SuppressWarnings("unchecked")
- TypeDescriptor<DestinationT> descriptor =
- (TypeDescriptor<DestinationT>) superDescriptor.resolveType(typeVariable);
+ @Nullable TypeDescriptor<DestinationT> descriptor =
+ extractFromTypeParameters(
+ this,
+ DynamicDestinations.class,
+ new TypeVariableExtractor<
+ DynamicDestinations<UserT, DestinationT>, DestinationT>() {});
+ checkArgument(
+ descriptor != null,
+ "Unable to infer a coder for DestinationT, "
+ + "please specify it explicitly by overriding getDestinationCoder()");
return registry.getCoder(descriptor);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/62c922b3/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java
index 14f2cb8..dd6a0fd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java
@@ -328,30 +328,64 @@ public abstract class TypeDescriptor<T> implements Serializable {
}
/**
- * Returns a new {@code TypeDescriptor} where type variables represented by
- * {@code typeParameter} are substituted by {@code typeDescriptor}. For example, it can be used to
- * construct {@code Map<K, V>} for any {@code K} and {@code V} type: <pre> {@code
- * static <K, V> TypeDescriptor<Map<K, V>> mapOf(
- * TypeDescriptor<K> keyType, TypeDescriptor<V> valueType) {
- * return new TypeDescriptor<Map<K, V>>() {}
- * .where(new TypeParameter<K>() {}, keyType)
- * .where(new TypeParameter<V>() {}, valueType);
- * }}</pre>
+ * Returns a new {@code TypeDescriptor} where the type variable represented by {@code
+ * typeParameter} are substituted by {@code typeDescriptor}. For example, it can be used to
+ * construct {@code Map<K, V>} for any {@code K} and {@code V} type:
+ *
+ * <pre>{@code
+ * static <K, V> TypeDescriptor<Map<K, V>> mapOf(
+ * TypeDescriptor<K> keyType, TypeDescriptor<V> valueType) {
+ * return new TypeDescriptor<Map<K, V>>() {}
+ * .where(new TypeParameter<K>() {}, keyType)
+ * .where(new TypeParameter<V>() {}, valueType);
+ * }
+ * }</pre>
*
* @param <X> The parameter type
* @param typeParameter the parameter type variable
* @param typeDescriptor the actual type to substitute
*/
@SuppressWarnings("unchecked")
- public <X> TypeDescriptor<T> where(TypeParameter<X> typeParameter,
- TypeDescriptor<X> typeDescriptor) {
- TypeResolver resolver =
- new TypeResolver()
- .where(
- typeParameter.typeVariable, typeDescriptor.getType());
+ public <X> TypeDescriptor<T> where(
+ TypeParameter<X> typeParameter, TypeDescriptor<X> typeDescriptor) {
+ return where(typeParameter.typeVariable, typeDescriptor.getType());
+ }
+
+ /**
+ * A more general form of {@link #where(TypeParameter, TypeDescriptor)} that returns a new {@code
+ * TypeDescriptor} by matching {@code formal} against {@code actual} to resolve type variables in
+ * the current {@link TypeDescriptor}.
+ */
+ @SuppressWarnings("unchecked")
+ public TypeDescriptor<T> where(Type formal, Type actual) {
+ TypeResolver resolver = new TypeResolver().where(formal, actual);
return (TypeDescriptor<T>) TypeDescriptor.of(resolver.resolveType(token.getType()));
}
+ /**
+ * Returns whether this {@link TypeDescriptor} has any unresolved type parameters, as opposed to
+ * being a concrete type.
+ *
+ * <p>For example:
+ * <pre>{@code
+ * TypeDescriptor.of(new ArrayList<String>() {}.getClass()).hasUnresolvedTypeParameters()
+ * => false, because the anonymous class is instantiated with a concrete type
+ *
+ * class TestUtils {
+ * <T> ArrayList<T> createTypeErasedList() {
+ * return new ArrayList<T>() {};
+ * }
+ * }
+ *
+ * TypeDescriptor.of(TestUtils.<String>createTypeErasedList().getClass())
+ * => true, because the type variable T got type-erased and the anonymous ArrayList class
+ * is instantiated with an unresolved type variable T.
+ * }</pre>
+ */
+ public boolean hasUnresolvedParameters() {
+ return hasUnresolvedParameters(getType());
+ }
+
@Override
public String toString() {
return token.toString();
http://git-wip-us.apache.org/repos/asf/beam/blob/62c922b3/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
index a4626c9..8207f06 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
@@ -17,16 +17,20 @@
*/
package org.apache.beam.sdk.values;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.List;
import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.SerializableFunction;
/**
- * A utility class containing the Java primitives for
- * {@link TypeDescriptor} equivalents. Also, has methods
- * for classes that wrap Java primitives like {@link KV},
- * {@link Set}, {@link List}, and {@link Iterable}.
+ * A utility class for creating {@link TypeDescriptor} objects for different types, such as Java
+ * primitive types, containers and {@link KV KVs} of other {@link TypeDescriptor} objects, and
+ * extracting type variables of parameterized types (e.g. extracting the {@code OutputT} type
+ * variable of a {@code DoFn<InputT, OutputT>}).
*/
public class TypeDescriptors {
/**
@@ -286,4 +290,110 @@ public class TypeDescriptors {
return typeDescriptor;
}
+
+ /**
+ * A helper interface for use with {@link #extractFromTypeParameters(Object, Class,
+ * TypeVariableExtractor)}.
+ */
+ public interface TypeVariableExtractor<InputT, OutputT> {}
+
+ /**
+ * Extracts a type from the actual type parameters of a parameterized class, subject to Java type
+ * erasure. The type to extract is specified in a way that is safe w.r.t. changing the type
+ * signature of the parameterized class, as opposed to specifying the name or index of a type
+ * variable.
+ *
+ * <p>Example of use:
+ * <pre>{@code
+ * class Foo<BarT> {
+ * private SerializableFunction<BarT, String> fn;
+ *
+ * TypeDescriptor<BarT> inferBarTypeDescriptorFromFn() {
+ * return TypeDescriptors.extractFromTypeParameters(
+ * fn,
+ * SerializableFunction.class,
+ * // The actual type of "fn" is matched against the input type of the extractor,
+ * // and the obtained values of type variables of the superclass are substituted
+ * // into the output type of the extractor.
+ * new TypeVariableExtractor<SerializableFunction<BarT, String>, BarT>() {});
+ * }
+ * }
+ * }</pre>
+ *
+ * @param instance The object being analyzed
+ * @param supertype Parameterized superclass of interest
+ * @param extractor A class for specifying the type to extract from the supertype
+ *
+ * @return A {@link TypeDescriptor} for the actual value of the result type of the extractor,
+ * or {@code null} if the type was erased.
+ */
+ @SuppressWarnings("unchecked")
+ @Nullable
+ public static <T, V> TypeDescriptor<V> extractFromTypeParameters(
+ T instance, Class<? super T> supertype, TypeVariableExtractor<T, V> extractor) {
+ return extractFromTypeParameters(
+ (TypeDescriptor<T>) TypeDescriptor.of(instance.getClass()), supertype, extractor);
+ }
+
+ /**
+ * Like {@link #extractFromTypeParameters(Object, Class, TypeVariableExtractor)}, but takes a
+ * {@link TypeDescriptor} of the instance being analyzed rather than the instance itself.
+ */
+ @SuppressWarnings("unchecked")
+ @Nullable
+ public static <T, V> TypeDescriptor<V> extractFromTypeParameters(
+ TypeDescriptor<T> type, Class<? super T> supertype, TypeVariableExtractor<T, V> extractor) {
+ // Get the type signature of the extractor, e.g.
+ // TypeVariableExtractor<SerializableFunction<BarT, String>, BarT>
+ TypeDescriptor<TypeVariableExtractor<T, V>> extractorSupertype =
+ (TypeDescriptor<TypeVariableExtractor<T, V>>)
+ TypeDescriptor.of(extractor.getClass()).getSupertype(TypeVariableExtractor.class);
+
+ // Get the actual type argument, e.g. SerializableFunction<BarT, String>
+ Type inputT = ((ParameterizedType) extractorSupertype.getType()).getActualTypeArguments()[0];
+
+ // Get the actual supertype of the type being analyzed, hopefully with all type parameters
+ // resolved, e.g. SerializableFunction<Integer, String>
+ TypeDescriptor supertypeDescriptor = type.getSupertype(supertype);
+
+ // Substitute actual supertype into the extractor, e.g.
+ // TypeVariableExtractor<SerializableFunction<Integer, String>, Integer>
+ TypeDescriptor<TypeVariableExtractor<T, V>> extractorT =
+ extractorSupertype.where(inputT, supertypeDescriptor.getType());
+
+ // Get output of the extractor.
+ Type outputT = ((ParameterizedType) extractorT.getType()).getActualTypeArguments()[1];
+ TypeDescriptor<?> res = TypeDescriptor.of(outputT);
+ if (res.hasUnresolvedParameters()) {
+ return null;
+ } else {
+ return (TypeDescriptor<V>) res;
+ }
+ }
+
+ /**
+ * Returns a type descriptor for the input of the given {@link SerializableFunction}, subject to
+ * Java type erasure: returns {@code null} if the type was erased.
+ */
+ @Nullable
+ public static <InputT, OutputT> TypeDescriptor<InputT> inputOf(
+ SerializableFunction<InputT, OutputT> fn) {
+ return extractFromTypeParameters(
+ fn,
+ SerializableFunction.class,
+ new TypeVariableExtractor<SerializableFunction<InputT, OutputT>, InputT>() {});
+ }
+
+ /**
+ * Returns a type descriptor for the output of the given {@link SerializableFunction}, subject to
+ * Java type erasure: returns {@code null} if the type was erased.
+ */
+ @Nullable
+ public static <InputT, OutputT> TypeDescriptor<OutputT> outputOf(
+ SerializableFunction<InputT, OutputT> fn) {
+ return extractFromTypeParameters(
+ fn,
+ SerializableFunction.class,
+ new TypeVariableExtractor<SerializableFunction<InputT, OutputT>, OutputT>() {});
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/62c922b3/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java
index 1bf0fc9..a4f58da 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java
@@ -25,6 +25,7 @@ import static org.apache.beam.sdk.values.TypeDescriptors.sets;
import static org.apache.beam.sdk.values.TypeDescriptors.strings;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
import java.util.List;
import java.util.Set;
@@ -70,4 +71,52 @@ public class TypeDescriptorsTest {
assertNotEquals(descriptor, new TypeDescriptor<List<String>>() {});
assertNotEquals(descriptor, new TypeDescriptor<List<Boolean>>() {});
}
+
+ private interface Generic<FooT, BarT> {}
+
+ private static <ActualFooT> Generic<ActualFooT, String> typeErasedGeneric() {
+ return new Generic<ActualFooT, String>() {};
+ }
+
+ private static <ActualFooT, ActualBarT> TypeDescriptor<ActualFooT> extractFooT(
+ Generic<ActualFooT, ActualBarT> instance) {
+ return TypeDescriptors.extractFromTypeParameters(
+ instance,
+ Generic.class,
+ new TypeDescriptors.TypeVariableExtractor<
+ Generic<ActualFooT, ActualBarT>, ActualFooT>() {});
+ }
+
+ private static <ActualFooT, ActualBarT> TypeDescriptor<ActualBarT> extractBarT(
+ Generic<ActualFooT, ActualBarT> instance) {
+ return TypeDescriptors.extractFromTypeParameters(
+ instance,
+ Generic.class,
+ new TypeDescriptors.TypeVariableExtractor<
+ Generic<ActualFooT, ActualBarT>, ActualBarT>() {});
+ }
+
+ private static <ActualFooT, ActualBarT> TypeDescriptor<KV<ActualFooT, ActualBarT>> extractKV(
+ Generic<ActualFooT, ActualBarT> instance) {
+ return TypeDescriptors.extractFromTypeParameters(
+ instance,
+ Generic.class,
+ new TypeDescriptors.TypeVariableExtractor<
+ Generic<ActualFooT, ActualBarT>, KV<ActualFooT, ActualBarT>>() {});
+ }
+
+ @Test
+ public void testTypeDescriptorsTypeParameterOf() throws Exception {
+ assertEquals(strings(), extractFooT(new Generic<String, Integer>() {}));
+ assertEquals(integers(), extractBarT(new Generic<String, Integer>() {}));
+ assertEquals(kvs(strings(), integers()), extractKV(new Generic<String, Integer>() {}));
+ }
+
+ @Test
+ public void testTypeDescriptorsTypeParameterOfErased() throws Exception {
+ Generic<Integer, String> instance = TypeDescriptorsTest.typeErasedGeneric();
+ assertNull(extractFooT(instance));
+ assertEquals(strings(), extractBarT(instance));
+ assertNull(extractKV(instance));
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/62c922b3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
index c5c2462..ea4fc4e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
@@ -19,11 +19,11 @@
package org.apache.beam.sdk.io.gcp.bigquery;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParameters;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.collect.Lists;
import java.io.Serializable;
-import java.lang.reflect.TypeVariable;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueInSingleWindow;
/**
@@ -157,17 +158,16 @@ public abstract class DynamicDestinations<T, DestinationT> implements Serializab
return destinationCoder;
}
// If dynamicDestinations doesn't provide a coder, try to find it in the coder registry.
- // We must first use reflection to figure out what the type parameter is.
- TypeDescriptor<?> superDescriptor =
- TypeDescriptor.of(getClass()).getSupertype(DynamicDestinations.class);
- if (!superDescriptor.getRawType().equals(DynamicDestinations.class)) {
- throw new AssertionError(
- "Couldn't find the DynamicDestinations superclass of " + this.getClass());
- }
- TypeVariable typeVariable = superDescriptor.getTypeParameter("DestinationT");
- @SuppressWarnings("unchecked")
TypeDescriptor<DestinationT> descriptor =
- (TypeDescriptor<DestinationT>) superDescriptor.resolveType(typeVariable);
+ extractFromTypeParameters(
+ this,
+ DynamicDestinations.class,
+ new TypeDescriptors.TypeVariableExtractor<
+ DynamicDestinations<T, DestinationT>, DestinationT>() {});
+ checkArgument(
+ descriptor != null,
+ "Unable to infer a coder for DestinationT, "
+ + "please specify it explicitly by overriding getDestinationCoder()");
return registry.getCoder(descriptor);
}
}
[4/4] beam git commit: This closes #3632: [BEAM-2677]
AvroIO.parseGenericRecords - schemaless AvroIO.read
Posted by jk...@apache.org.
This closes #3632: [BEAM-2677] AvroIO.parseGenericRecords - schemaless AvroIO.read
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a94d680e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a94d680e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a94d680e
Branch: refs/heads/master
Commit: a94d680eabd47a69be628028799ac4d41bd110d5
Parents: 7ab8954 c16947e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jul 28 10:25:35 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jul 28 10:25:35 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 199 ++++++++++++++++++-
.../java/org/apache/beam/sdk/io/AvroSource.java | 166 ++++++++++++----
.../org/apache/beam/sdk/io/FileBasedSink.java | 24 +--
.../apache/beam/sdk/values/TypeDescriptor.java | 64 ++++--
.../apache/beam/sdk/values/TypeDescriptors.java | 118 ++++++++++-
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 89 ++++++---
.../org/apache/beam/sdk/io/AvroSourceTest.java | 30 ++-
.../beam/sdk/values/TypeDescriptorsTest.java | 49 +++++
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 4 +-
.../io/gcp/bigquery/DynamicDestinations.java | 22 +-
.../sdk/io/gcp/bigquery/TransformingSource.java | 136 -------------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 68 -------
12 files changed, 641 insertions(+), 328 deletions(-)
----------------------------------------------------------------------
[2/4] beam git commit: [BEAM-2677] AvroIO.parseGenericRecords -
schemaless AvroIO.read
Posted by jk...@apache.org.
[BEAM-2677] AvroIO.parseGenericRecords - schemaless AvroIO.read
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ebd00411
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ebd00411
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ebd00411
Branch: refs/heads/master
Commit: ebd004119c387787d0e0fcd0487e1b2754c7dbc5
Parents: 62c922b
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Jul 24 15:07:15 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jul 28 10:25:07 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 199 ++++++++++++++++++-
.../java/org/apache/beam/sdk/io/AvroSource.java | 166 ++++++++++++----
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 89 ++++++---
.../org/apache/beam/sdk/io/AvroSourceTest.java | 30 ++-
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 4 +-
5 files changed, 406 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 018b84f..27c9073 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -35,7 +35,9 @@ import org.apache.avro.reflect.ReflectData;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
@@ -53,13 +55,16 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
/**
* {@link PTransform}s for reading and writing Avro files.
*
- * <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()}, using
- * {@link AvroIO.Read#from} to specify the filename or filepattern to read from. Alternatively, if
- * the filepatterns to be read are themselves in a {@link PCollection}, apply {@link #readAll}.
+ * <p>To read a {@link PCollection} from one or more Avro files with the same schema known at
+ * pipeline construction time, use {@code AvroIO.read()}, using {@link AvroIO.Read#from} to specify
+ * the filename or filepattern to read from. Alternatively, if the filepatterns to be read are
+ * themselves in a {@link PCollection}, apply {@link #readAll}.
*
* <p>See {@link FileSystems} for information on supported file systems and filepatterns.
*
@@ -70,6 +75,12 @@ import org.apache.beam.sdk.values.PDone;
* schema. Likewise, to read a {@link PCollection} of filepatterns, apply {@link
* #readAllGenericRecords}.
*
+ * <p>To read records from files whose schema is unknown at pipeline construction time or differs
+ * between files, use {@link #parseGenericRecords} - in this case, you will need to specify a
+ * parsing function for converting each {@link GenericRecord} into a value of your custom type.
+ * Likewise, to read a {@link PCollection} of filepatterns with unknown schema, use {@link
+ * #parseAllGenericRecords}.
+ *
* <p>For example:
*
* <pre>{@code
@@ -84,12 +95,20 @@ import org.apache.beam.sdk.values.PDone;
* PCollection<GenericRecord> records =
* p.apply(AvroIO.readGenericRecords(schema)
* .from("gs://my_bucket/path/to/records-*.avro"));
+ *
+ * PCollection<Foo> records =
+ * p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() {
+ * public Foo apply(GenericRecord record) {
+ * // If needed, access the schema of the record using record.getSchema()
+ * return ...;
+ * }
+ * }));
* }</pre>
*
* <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
- * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
- * scalability. Note that it may decrease performance if the filepattern matches only a small
- * number of files.
+ * thousands or more), use {@link Read#withHintMatchesManyFiles} or {@link
+ * Parse#withHintMatchesManyFiles} for better performance and scalability. Note that it may decrease
+ * performance if the filepattern matches only a small number of files.
*
* <p>Reading from a {@link PCollection} of filepatterns:
*
@@ -101,6 +120,8 @@ import org.apache.beam.sdk.values.PDone;
* filepatterns.apply(AvroIO.read(AvroAutoGenClass.class));
* PCollection<GenericRecord> genericRecords =
* filepatterns.apply(AvroIO.readGenericRecords(schema));
+ * PCollection<Foo> records =
+ * filepatterns.apply(AvroIO.parseAllGenericRecords(new SerializableFunction...);
* }</pre>
*
* <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, using
@@ -208,6 +229,29 @@ public class AvroIO {
}
/**
+ * Reads Avro file(s) containing records of an unspecified schema and converting each record to a
+ * custom type.
+ */
+ public static <T> Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) {
+ return new AutoValue_AvroIO_Parse.Builder<T>()
+ .setParseFn(parseFn)
+ .setHintMatchesManyFiles(false)
+ .build();
+ }
+
+ /**
+ * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each filepattern in the
+ * input {@link PCollection}.
+ */
+ public static <T> ParseAll<T> parseAllGenericRecords(
+ SerializableFunction<GenericRecord, T> parseFn) {
+ return new AutoValue_AvroIO_ParseAll.Builder<T>()
+ .setParseFn(parseFn)
+ .setDesiredBundleSizeBytes(64 * 1024 * 1024L)
+ .build();
+ }
+
+ /**
* Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding
* pattern).
*/
@@ -387,6 +431,149 @@ public class AvroIO {
/////////////////////////////////////////////////////////////////////////////
+ /** Implementation of {@link #parseGenericRecords}. */
+ @AutoValue
+ public abstract static class Parse<T> extends PTransform<PBegin, PCollection<T>> {
+ @Nullable abstract ValueProvider<String> getFilepattern();
+ abstract SerializableFunction<GenericRecord, T> getParseFn();
+ @Nullable abstract Coder<T> getCoder();
+ abstract boolean getHintMatchesManyFiles();
+
+ abstract Builder<T> toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
+ abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
+ abstract Builder<T> setCoder(Coder<T> coder);
+ abstract Builder<T> setHintMatchesManyFiles(boolean hintMatchesManyFiles);
+
+ abstract Parse<T> build();
+ }
+
+ /** Reads from the given filename or filepattern. */
+ public Parse<T> from(String filepattern) {
+ return from(StaticValueProvider.of(filepattern));
+ }
+
+ /** Like {@link #from(String)}. */
+ public Parse<T> from(ValueProvider<String> filepattern) {
+ return toBuilder().setFilepattern(filepattern).build();
+ }
+
+ /** Sets a coder for the result of the parse function. */
+ public Parse<T> withCoder(Coder<T> coder) {
+ return toBuilder().setCoder(coder).build();
+ }
+
+ /** Like {@link Read#withHintMatchesManyFiles()}. */
+ public Parse<T> withHintMatchesManyFiles() {
+ return toBuilder().setHintMatchesManyFiles(true).build();
+ }
+
+ @Override
+ public PCollection<T> expand(PBegin input) {
+ checkNotNull(getFilepattern(), "filepattern");
+ Coder<T> coder = inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
+ if (getHintMatchesManyFiles()) {
+ return input
+ .apply(Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+ .apply(parseAllGenericRecords(getParseFn()).withCoder(getCoder()));
+ }
+ return input.apply(
+ org.apache.beam.sdk.io.Read.from(
+ AvroSource.from(getFilepattern()).withParseFn(getParseFn(), coder)));
+ }
+
+ private static <T> Coder<T> inferCoder(
+ @Nullable Coder<T> explicitCoder,
+ SerializableFunction<GenericRecord, T> parseFn,
+ CoderRegistry coderRegistry) {
+ if (explicitCoder != null) {
+ return explicitCoder;
+ }
+ // If a coder was not specified explicitly, infer it from parse fn.
+ TypeDescriptor<T> descriptor = TypeDescriptors.outputOf(parseFn);
+ String message =
+ "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().";
+ checkArgument(descriptor != null, message);
+ try {
+ return coderRegistry.getCoder(descriptor);
+ } catch (CannotProvideCoderException e) {
+ throw new IllegalArgumentException(message, e);
+ }
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .addIfNotNull(
+ DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
+ .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"));
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ /** Implementation of {@link #parseAllGenericRecords}. */
+ @AutoValue
+ public abstract static class ParseAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
+ abstract SerializableFunction<GenericRecord, T> getParseFn();
+ @Nullable abstract Coder<T> getCoder();
+ abstract long getDesiredBundleSizeBytes();
+
+ abstract Builder<T> toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
+ abstract Builder<T> setCoder(Coder<T> coder);
+ abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
+
+ abstract ParseAll<T> build();
+ }
+
+ /** Specifies the coder for the result of the {@code parseFn}. */
+ public ParseAll<T> withCoder(Coder<T> coder) {
+ return toBuilder().setCoder(coder).build();
+ }
+
+ @VisibleForTesting
+ ParseAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+ return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+ }
+
+ @Override
+ public PCollection<T> expand(PCollection<String> input) {
+ final Coder<T> coder =
+ Parse.inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
+ SerializableFunction<String, FileBasedSource<T>> createSource =
+ new SerializableFunction<String, FileBasedSource<T>>() {
+ @Override
+ public FileBasedSource<T> apply(String input) {
+ return AvroSource.from(input).withParseFn(getParseFn(), coder);
+ }
+ };
+ return input
+ .apply(
+ "Parse all via FileBasedSource",
+ new ReadAllViaFileBasedSource<>(
+ SerializableFunctions.<String, Boolean>constant(true) /* isSplittable */,
+ getDesiredBundleSizeBytes(),
+ createSource))
+ .setCoder(coder);
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"));
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
/** Implementation of {@link #write}. */
@AutoValue
public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index a98d870..d277503 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
@@ -27,8 +28,10 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InvalidObjectException;
+import java.io.ObjectInputStream;
import java.io.ObjectStreamException;
import java.io.PushbackInputStream;
+import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
@@ -53,10 +56,12 @@ import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream;
@@ -130,19 +135,84 @@ public class AvroSource<T> extends BlockBasedSource<T> {
// The default sync interval is 64k.
private static final long DEFAULT_MIN_BUNDLE_SIZE = 2 * DataFileConstants.DEFAULT_SYNC_INTERVAL;
- // The type of the records contained in the file.
- private final Class<T> type;
+ // Use cases of AvroSource are:
+ // 1) AvroSource<GenericRecord> Reading GenericRecord records with a specified schema.
+ // 2) AvroSource<Foo> Reading records of a generated Avro class Foo.
+ // 3) AvroSource<T> Reading GenericRecord records with an unspecified schema
+ // and converting them to type T.
+ // | Case 1 | Case 2 | Case 3 |
+ // type | GenericRecord | Foo | GenericRecord |
+ // readerSchemaString | non-null | non-null | null |
+ // parseFn | null | null | non-null |
+ // outputCoder | null | null | non-null |
+ private static class Mode<T> implements Serializable {
+ private final Class<?> type;
+
+ // The JSON schema used to decode records.
+ @Nullable
+ private String readerSchemaString;
+
+ @Nullable
+ private final SerializableFunction<GenericRecord, T> parseFn;
+
+ @Nullable
+ private final Coder<T> outputCoder;
+
+ private Mode(
+ Class<?> type,
+ @Nullable String readerSchemaString,
+ @Nullable SerializableFunction<GenericRecord, T> parseFn,
+ @Nullable Coder<T> outputCoder) {
+ this.type = type;
+ this.readerSchemaString = internSchemaString(readerSchemaString);
+ this.parseFn = parseFn;
+ this.outputCoder = outputCoder;
+ }
- // The JSON schema used to decode records.
- @Nullable
- private final String readerSchemaString;
+ private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException {
+ is.defaultReadObject();
+ readerSchemaString = internSchemaString(readerSchemaString);
+ }
+
+ private Coder<T> getOutputCoder() {
+ if (parseFn == null) {
+ return AvroCoder.of((Class<T>) type, internOrParseSchemaString(readerSchemaString));
+ } else {
+ return outputCoder;
+ }
+ }
+
+ private void validate() {
+ if (parseFn == null) {
+ checkArgument(
+ readerSchemaString != null,
+ "schema must be specified using withSchema() when not using a parse fn");
+ }
+ }
+ }
+
+ private static Mode<GenericRecord> readGenericRecordsWithSchema(String schema) {
+ return new Mode<>(GenericRecord.class, schema, null, null);
+ }
+ private static <T> Mode<T> readGeneratedClasses(Class<T> clazz) {
+ return new Mode<>(clazz, ReflectData.get().getSchema(clazz).toString(), null, null);
+ }
+ private static <T> Mode<T> parseGenericRecords(
+ SerializableFunction<GenericRecord, T> parseFn, Coder<T> outputCoder) {
+ return new Mode<>(GenericRecord.class, null, parseFn, outputCoder);
+ }
+
+ private final Mode<T> mode;
/**
- * Reads from the given file name or pattern ("glob"). The returned source can be further
+ * Reads from the given file name or pattern ("glob"). The returned source needs to be further
* configured by calling {@link #withSchema} to return a type other than {@link GenericRecord}.
*/
public static AvroSource<GenericRecord> from(ValueProvider<String> fileNameOrPattern) {
- return new AvroSource<>(fileNameOrPattern, DEFAULT_MIN_BUNDLE_SIZE, null, GenericRecord.class);
+ return new AvroSource<>(
+ fileNameOrPattern,
+ DEFAULT_MIN_BUNDLE_SIZE,
+ readGenericRecordsWithSchema(null /* will need to be specified in withSchema */));
}
/** Like {@link #from(ValueProvider)}. */
@@ -152,23 +222,40 @@ public class AvroSource<T> extends BlockBasedSource<T> {
/** Reads files containing records that conform to the given schema. */
public AvroSource<GenericRecord> withSchema(String schema) {
+ checkNotNull(schema, "schema");
return new AvroSource<>(
- getFileOrPatternSpecProvider(), getMinBundleSize(), schema, GenericRecord.class);
+ getFileOrPatternSpecProvider(),
+ getMinBundleSize(),
+ readGenericRecordsWithSchema(schema));
}
/** Like {@link #withSchema(String)}. */
public AvroSource<GenericRecord> withSchema(Schema schema) {
- return new AvroSource<>(
- getFileOrPatternSpecProvider(), getMinBundleSize(), schema.toString(), GenericRecord.class);
+ checkNotNull(schema, "schema");
+ return withSchema(schema.toString());
}
/** Reads files containing records of the given class. */
public <X> AvroSource<X> withSchema(Class<X> clazz) {
+ checkNotNull(clazz, "clazz");
+ return new AvroSource<>(
+ getFileOrPatternSpecProvider(),
+ getMinBundleSize(),
+ readGeneratedClasses(clazz));
+ }
+
+ /**
+ * Reads {@link GenericRecord} of unspecified schema and maps them to instances of a custom type
+ * using the given {@code parseFn} and encoded using the given coder.
+ */
+ public <X> AvroSource<X> withParseFn(
+ SerializableFunction<GenericRecord, X> parseFn, Coder<X> coder) {
+ checkNotNull(parseFn, "parseFn");
+ checkNotNull(parseFn, "coder");
return new AvroSource<>(
getFileOrPatternSpecProvider(),
getMinBundleSize(),
- ReflectData.get().getSchema(clazz).toString(),
- clazz);
+ parseGenericRecords(parseFn, coder));
}
/**
@@ -176,19 +263,16 @@ public class AvroSource<T> extends BlockBasedSource<T> {
* minBundleSize} and its use.
*/
public AvroSource<T> withMinBundleSize(long minBundleSize) {
- return new AvroSource<>(
- getFileOrPatternSpecProvider(), minBundleSize, readerSchemaString, type);
+ return new AvroSource<>(getFileOrPatternSpecProvider(), minBundleSize, mode);
}
/** Constructor for FILEPATTERN mode. */
private AvroSource(
ValueProvider<String> fileNameOrPattern,
long minBundleSize,
- String readerSchemaString,
- Class<T> type) {
+ Mode<T> mode) {
super(fileNameOrPattern, minBundleSize);
- this.readerSchemaString = internSchemaString(readerSchemaString);
- this.type = type;
+ this.mode = mode;
}
/** Constructor for SINGLE_FILE_OR_SUBRANGE mode. */
@@ -197,18 +281,15 @@ public class AvroSource<T> extends BlockBasedSource<T> {
long minBundleSize,
long startOffset,
long endOffset,
- String readerSchemaString,
- Class<T> type) {
+ Mode<T> mode) {
super(metadata, minBundleSize, startOffset, endOffset);
- this.readerSchemaString = internSchemaString(readerSchemaString);
- this.type = type;
+ this.mode = mode;
}
@Override
public void validate() {
- // AvroSource objects do not need to be configured with more than a file pattern. Overridden to
- // make this explicit.
super.validate();
+ mode.validate();
}
/**
@@ -225,7 +306,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
@Override
public BlockBasedSource<T> createForSubrangeOfFile(Metadata fileMetadata, long start, long end) {
- return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, readerSchemaString, type);
+ return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, mode);
}
@Override
@@ -234,14 +315,14 @@ public class AvroSource<T> extends BlockBasedSource<T> {
}
@Override
- public AvroCoder<T> getDefaultOutputCoder() {
- return AvroCoder.of(type, internOrParseSchemaString(readerSchemaString));
+ public Coder<T> getDefaultOutputCoder() {
+ return mode.getOutputCoder();
}
@VisibleForTesting
@Nullable
String getReaderSchemaString() {
- return readerSchemaString;
+ return mode.readerSchemaString;
}
/** Avro file metadata. */
@@ -380,15 +461,9 @@ public class AvroSource<T> extends BlockBasedSource<T> {
switch (getMode()) {
case SINGLE_FILE_OR_SUBRANGE:
return new AvroSource<>(
- getSingleFileMetadata(),
- getMinBundleSize(),
- getStartOffset(),
- getEndOffset(),
- readerSchemaString,
- type);
+ getSingleFileMetadata(), getMinBundleSize(), getStartOffset(), getEndOffset(), mode);
case FILEPATTERN:
- return new AvroSource<>(
- getFileOrPatternSpecProvider(), getMinBundleSize(), readerSchemaString, type);
+ return new AvroSource<>(getFileOrPatternSpecProvider(), getMinBundleSize(), mode);
default:
throw new InvalidObjectException(
String.format("Unknown mode %s for AvroSource %s", getMode(), this));
@@ -402,6 +477,8 @@ public class AvroSource<T> extends BlockBasedSource<T> {
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
static class AvroBlock<T> extends Block<T> {
+ private final Mode<T> mode;
+
// The number of records in the block.
private final long numRecords;
@@ -412,7 +489,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
private long currentRecordIndex = 0;
// A DatumReader to read records from the block.
- private final DatumReader<T> reader;
+ private final DatumReader<?> reader;
// A BinaryDecoder used by the reader to decode records.
private final BinaryDecoder decoder;
@@ -455,19 +532,19 @@ public class AvroSource<T> extends BlockBasedSource<T> {
AvroBlock(
byte[] data,
long numRecords,
- Class<? extends T> type,
- String readerSchemaString,
+ Mode<T> mode,
String writerSchemaString,
String codec)
throws IOException {
+ this.mode = mode;
this.numRecords = numRecords;
checkNotNull(writerSchemaString, "writerSchemaString");
Schema writerSchema = internOrParseSchemaString(writerSchemaString);
Schema readerSchema =
internOrParseSchemaString(
- MoreObjects.firstNonNull(readerSchemaString, writerSchemaString));
+ MoreObjects.firstNonNull(mode.readerSchemaString, writerSchemaString));
this.reader =
- (type == GenericRecord.class)
+ (mode.type == GenericRecord.class)
? new GenericDatumReader<T>(writerSchema, readerSchema)
: new ReflectDatumReader<T>(writerSchema, readerSchema);
this.decoder = DecoderFactory.get().binaryDecoder(decodeAsInputStream(data, codec), null);
@@ -483,7 +560,9 @@ public class AvroSource<T> extends BlockBasedSource<T> {
if (currentRecordIndex >= numRecords) {
return false;
}
- currentRecord = reader.read(null, decoder);
+ Object record = reader.read(null, decoder);
+ currentRecord =
+ (mode.parseFn == null) ? ((T) record) : mode.parseFn.apply((GenericRecord) record);
currentRecordIndex++;
return true;
}
@@ -585,8 +664,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
new AvroBlock<>(
data,
numRecords,
- getCurrentSource().type,
- getCurrentSource().readerSchemaString,
+ getCurrentSource().mode,
metadata.getSchemaString(),
metadata.getCodec());
http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 90cd824..154ff5a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -67,6 +67,7 @@ import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -114,9 +115,9 @@ public class AvroIOTest {
public GenericClass() {}
- public GenericClass(int intValue, String stringValue) {
- this.intField = intValue;
- this.stringField = stringValue;
+ public GenericClass(int intField, String stringField) {
+ this.intField = intField;
+ this.stringField = stringField;
}
@Override
@@ -142,9 +143,18 @@ public class AvroIOTest {
}
}
+ private static class ParseGenericClass
+ implements SerializableFunction<GenericRecord, GenericClass> {
+ @Override
+ public GenericClass apply(GenericRecord input) {
+ return new GenericClass(
+ (int) input.get("intField"), input.get("stringField").toString());
+ }
+ }
+
@Test
@Category(NeedsRunner.class)
- public void testAvroIOWriteAndReadASingleFile() throws Throwable {
+ public void testAvroIOWriteAndReadAndParseASingleFile() throws Throwable {
List<GenericClass> values =
ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
File outputFile = tmpFolder.newFile("output.avro");
@@ -153,23 +163,45 @@ public class AvroIOTest {
.apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
writePipeline.run().waitUntilFinish();
- // Test the same data via read(), read().withHintMatchesManyFiles(), and readAll()
+ // Test the same data using all versions of read().
+ PCollection<String> path =
+ readPipeline.apply("Create path", Create.of(outputFile.getAbsolutePath()));
PAssert.that(
- readPipeline.apply(
- "Read", AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
+ readPipeline.apply(
+ "Read", AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
.containsInAnyOrder(values);
PAssert.that(
- readPipeline.apply(
- "Read withHintMatchesManyFiles",
- AvroIO.read(GenericClass.class)
- .from(outputFile.getAbsolutePath())
- .withHintMatchesManyFiles()))
+ readPipeline.apply(
+ "Read withHintMatchesManyFiles",
+ AvroIO.read(GenericClass.class)
+ .from(outputFile.getAbsolutePath())
+ .withHintMatchesManyFiles()))
.containsInAnyOrder(values);
PAssert.that(
- "ReadAll",
- readPipeline
- .apply(Create.of(outputFile.getAbsolutePath()))
- .apply(AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+ path.apply(
+ "ReadAll", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+ .containsInAnyOrder(values);
+ PAssert.that(
+ readPipeline.apply(
+ "Parse",
+ AvroIO.parseGenericRecords(new ParseGenericClass())
+ .from(outputFile.getAbsolutePath())
+ .withCoder(AvroCoder.of(GenericClass.class))))
+ .containsInAnyOrder(values);
+ PAssert.that(
+ readPipeline.apply(
+ "Parse withHintMatchesManyFiles",
+ AvroIO.parseGenericRecords(new ParseGenericClass())
+ .from(outputFile.getAbsolutePath())
+ .withCoder(AvroCoder.of(GenericClass.class))
+ .withHintMatchesManyFiles()))
+ .containsInAnyOrder(values);
+ PAssert.that(
+ path.apply(
+ "ParseAll",
+ AvroIO.parseAllGenericRecords(new ParseGenericClass())
+ .withCoder(AvroCoder.of(GenericClass.class))
+ .withDesiredBundleSizeBytes(10)))
.containsInAnyOrder(values);
readPipeline.run();
@@ -200,7 +232,7 @@ public class AvroIOTest {
.withNumShards(3));
writePipeline.run().waitUntilFinish();
- // Test both read() and readAll()
+ // Test read(), readAll(), and parseAllGenericRecords().
PAssert.that(
readPipeline.apply(
"Read first",
@@ -213,15 +245,22 @@ public class AvroIOTest {
AvroIO.read(GenericClass.class)
.from(tmpFolder.getRoot().getAbsolutePath() + "/second*")))
.containsInAnyOrder(secondValues);
+ PCollection<String> paths =
+ readPipeline.apply(
+ "Create paths",
+ Create.of(
+ tmpFolder.getRoot().getAbsolutePath() + "/first*",
+ tmpFolder.getRoot().getAbsolutePath() + "/second*"));
+ PAssert.that(
+ paths.apply(
+ "Read all", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+ .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
PAssert.that(
- readPipeline
- .apply(
- "Create paths",
- Create.of(
- tmpFolder.getRoot().getAbsolutePath() + "/first*",
- tmpFolder.getRoot().getAbsolutePath() + "/second*"))
- .apply(
- "Read all", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+ paths.apply(
+ "Parse all",
+ AvroIO.parseAllGenericRecords(new ParseGenericClass())
+ .withCoder(AvroCoder.of(GenericClass.class))
+ .withDesiredBundleSizeBytes(10)))
.containsInAnyOrder(Iterables.concat(firstValues, secondValues));
readPipeline.run();
http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index bf2ac95..714e029 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -59,6 +59,7 @@ import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.SerializableUtils;
import org.hamcrest.Matchers;
@@ -407,11 +408,6 @@ public class AvroSourceTest {
source = AvroSource.from(filename).withSchema(schemaString);
records = SourceTestUtils.readFromSource(source, null);
assertEqualsWithGeneric(expected, records);
-
- // Create a source with no schema
- source = AvroSource.from(filename);
- records = SourceTestUtils.readFromSource(source, null);
- assertEqualsWithGeneric(expected, records);
}
@Test
@@ -449,6 +445,30 @@ public class AvroSourceTest {
assertSame(sourceA.getReaderSchemaString(), sourceC.getReaderSchemaString());
}
+ @Test
+ public void testParseFn() throws Exception {
+ List<Bird> expected = createRandomRecords(100);
+ String filename = generateTestFile("tmp.avro", expected, SyncBehavior.SYNC_DEFAULT, 0,
+ AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC);
+
+ AvroSource<Bird> source =
+ AvroSource.from(filename)
+ .withParseFn(
+ new SerializableFunction<GenericRecord, Bird>() {
+ @Override
+ public Bird apply(GenericRecord input) {
+ return new Bird(
+ (long) input.get("number"),
+ input.get("species").toString(),
+ input.get("quality").toString(),
+ (long) input.get("quantity"));
+ }
+ },
+ AvroCoder.of(Bird.class));
+ List<Bird> actual = SourceTestUtils.readFromSource(source, null);
+ assertThat(actual, containsInAnyOrder(expected.toArray()));
+ }
+
private void assertEqualsWithGeneric(List<Bird> expected, List<GenericRecord> actual) {
assertEquals(expected.size(), actual.size());
for (int i = 0; i < expected.size(); i++) {
http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 2b1eafe..6c118a0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -183,8 +183,8 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
for (ResourceId file : files) {
- avroSources.add(new TransformingSource<>(
- AvroSource.from(file.toString()), function, getDefaultOutputCoder()));
+ avroSources.add(
+ AvroSource.from(file.toString()).withParseFn(function, getDefaultOutputCoder()));
}
return ImmutableList.copyOf(avroSources);
}
[3/4] beam git commit: Removes TransformingSource that is now unused
Posted by jk...@apache.org.
Removes TransformingSource that is now unused
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c16947ec
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c16947ec
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c16947ec
Branch: refs/heads/master
Commit: c16947ec01d21ef99a5e2024d7aaead3c7a4399f
Parents: ebd0041
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jul 25 23:35:00 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jul 28 10:25:07 2017 -0700
----------------------------------------------------------------------
.../sdk/io/gcp/bigquery/TransformingSource.java | 136 -------------------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 68 ----------
2 files changed, 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c16947ec/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
deleted file mode 100644
index b8e6b39..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.util.List;
-import java.util.NoSuchElementException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.joda.time.Instant;
-
-/**
- * A {@link BoundedSource} that reads from {@code BoundedSource<T>}
- * and transforms elements to type {@code V}.
-*/
-@VisibleForTesting
-class TransformingSource<T, V> extends BoundedSource<V> {
- private final BoundedSource<T> boundedSource;
- private final SerializableFunction<T, V> function;
- private final Coder<V> outputCoder;
-
- TransformingSource(
- BoundedSource<T> boundedSource,
- SerializableFunction<T, V> function,
- Coder<V> outputCoder) {
- this.boundedSource = checkNotNull(boundedSource, "boundedSource");
- this.function = checkNotNull(function, "function");
- this.outputCoder = checkNotNull(outputCoder, "outputCoder");
- }
-
- @Override
- public List<? extends BoundedSource<V>> split(
- long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
- return Lists.transform(
- boundedSource.split(desiredBundleSizeBytes, options),
- new Function<BoundedSource<T>, BoundedSource<V>>() {
- @Override
- public BoundedSource<V> apply(BoundedSource<T> input) {
- return new TransformingSource<>(input, function, outputCoder);
- }
- });
- }
-
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
- return boundedSource.getEstimatedSizeBytes(options);
- }
-
- @Override
- public BoundedReader<V> createReader(PipelineOptions options) throws IOException {
- return new TransformingReader(boundedSource.createReader(options));
- }
-
- @Override
- public void validate() {
- boundedSource.validate();
- }
-
- @Override
- public Coder<V> getDefaultOutputCoder() {
- return outputCoder;
- }
-
- private class TransformingReader extends BoundedReader<V> {
- private final BoundedReader<T> boundedReader;
-
- private TransformingReader(BoundedReader<T> boundedReader) {
- this.boundedReader = checkNotNull(boundedReader, "boundedReader");
- }
-
- @Override
- public synchronized BoundedSource<V> getCurrentSource() {
- return new TransformingSource<>(boundedReader.getCurrentSource(), function, outputCoder);
- }
-
- @Override
- public boolean start() throws IOException {
- return boundedReader.start();
- }
-
- @Override
- public boolean advance() throws IOException {
- return boundedReader.advance();
- }
-
- @Override
- public V getCurrent() throws NoSuchElementException {
- T current = boundedReader.getCurrent();
- return function.apply(current);
- }
-
- @Override
- public void close() throws IOException {
- boundedReader.close();
- }
-
- @Override
- public synchronized BoundedSource<V> splitAtFraction(double fraction) {
- BoundedSource<T> split = boundedReader.splitAtFraction(fraction);
- return split == null ? null : new TransformingSource<>(split, function, outputCoder);
- }
-
- @Override
- public Double getFractionConsumed() {
- return boundedReader.getFractionConsumed();
- }
-
- @Override
- public Instant getCurrentTimestamp() throws NoSuchElementException {
- return boundedReader.getCurrentTimestamp();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c16947ec/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 3465b4e..8db4e94 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -86,7 +86,6 @@ import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.fs.ResourceId;
@@ -1510,8 +1509,6 @@ public class BigQueryIOTest implements Serializable {
// Simulate a repeated call to split(), like a Dataflow worker will sometimes do.
sources = bqSource.split(200, options);
assertEquals(2, sources.size());
- BoundedSource<TableRow> actual = sources.get(0);
- assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
// A repeated call to split() should not have caused a duplicate extract job.
assertEquals(1, fakeJobService.getNumExtractJobCalls());
@@ -1594,8 +1591,6 @@ public class BigQueryIOTest implements Serializable {
List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
assertEquals(2, sources.size());
- BoundedSource<TableRow> actual = sources.get(0);
- assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
}
@Test
@@ -1673,69 +1668,6 @@ public class BigQueryIOTest implements Serializable {
List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
assertEquals(2, sources.size());
- BoundedSource<TableRow> actual = sources.get(0);
- assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
- }
-
- @Test
- public void testTransformingSource() throws Exception {
- int numElements = 10000;
- @SuppressWarnings("deprecation")
- BoundedSource<Long> longSource = CountingSource.upTo(numElements);
- SerializableFunction<Long, String> toStringFn =
- new SerializableFunction<Long, String>() {
- @Override
- public String apply(Long input) {
- return input.toString();
- }};
- BoundedSource<String> stringSource = new TransformingSource<>(
- longSource, toStringFn, StringUtf8Coder.of());
-
- List<String> expected = Lists.newArrayList();
- for (int i = 0; i < numElements; i++) {
- expected.add(String.valueOf(i));
- }
-
- PipelineOptions options = PipelineOptionsFactory.create();
- Assert.assertThat(
- SourceTestUtils.readFromSource(stringSource, options),
- CoreMatchers.is(expected));
- SourceTestUtils.assertSplitAtFractionBehavior(
- stringSource, 100, 0.3, ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT, options);
-
- SourceTestUtils.assertSourcesEqualReferenceSource(
- stringSource, stringSource.split(100, options), options);
- }
-
- @Test
- public void testTransformingSourceUnsplittable() throws Exception {
- int numElements = 10000;
- @SuppressWarnings("deprecation")
- BoundedSource<Long> longSource =
- SourceTestUtils.toUnsplittableSource(CountingSource.upTo(numElements));
- SerializableFunction<Long, String> toStringFn =
- new SerializableFunction<Long, String>() {
- @Override
- public String apply(Long input) {
- return input.toString();
- }
- };
- BoundedSource<String> stringSource =
- new TransformingSource<>(longSource, toStringFn, StringUtf8Coder.of());
-
- List<String> expected = Lists.newArrayList();
- for (int i = 0; i < numElements; i++) {
- expected.add(String.valueOf(i));
- }
-
- PipelineOptions options = PipelineOptionsFactory.create();
- Assert.assertThat(
- SourceTestUtils.readFromSource(stringSource, options), CoreMatchers.is(expected));
- SourceTestUtils.assertSplitAtFractionBehavior(
- stringSource, 100, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
-
- SourceTestUtils.assertSourcesEqualReferenceSource(
- stringSource, stringSource.split(100, options), options);
}
@Test