You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/11/07 19:59:17 UTC
[27/50] incubator-beam git commit: Switch DoFnSignature, etc,
from TypeToken to TypeDescriptor
Switch DoFnSignature, etc, from TypeToken to TypeDescriptor
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8336b24c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8336b24c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8336b24c
Branch: refs/heads/apex-runner
Commit: 8336b24c97c620fa3edb02301299080bda96379a
Parents: d936ed8
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 1 14:48:54 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Nov 3 21:32:53 2016 -0700
----------------------------------------------------------------------
.../sdk/transforms/reflect/DoFnInvokers.java | 7 +-
.../sdk/transforms/reflect/DoFnSignature.java | 23 ++-
.../sdk/transforms/reflect/DoFnSignatures.java | 177 ++++++++++---------
.../DoFnSignaturesSplittableDoFnTest.java | 18 +-
.../transforms/reflect/DoFnSignaturesTest.java | 7 +-
.../reflect/DoFnSignaturesTestUtils.java | 8 +-
6 files changed, 124 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8336b24c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index dd134b7..c5a23dc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.transforms.reflect;
import static com.google.common.base.Preconditions.checkArgument;
-import com.google.common.reflect.TypeToken;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -263,9 +262,9 @@ public class DoFnInvokers {
/** Default implementation of {@link DoFn.GetRestrictionCoder}, for delegation by bytebuddy. */
public static class DefaultRestrictionCoder {
- private final TypeToken<?> restrictionType;
+ private final TypeDescriptor<?> restrictionType;
- DefaultRestrictionCoder(TypeToken<?> restrictionType) {
+ DefaultRestrictionCoder(TypeDescriptor<?> restrictionType) {
this.restrictionType = restrictionType;
}
@@ -273,7 +272,7 @@ public class DoFnInvokers {
@SuppressWarnings({"unused", "unchecked"})
public <RestrictionT> Coder<RestrictionT> invokeGetRestrictionCoder(CoderRegistry registry)
throws CannotProvideCoderException {
- return (Coder) registry.getCoder(TypeDescriptor.of(restrictionType.getType()));
+ return (Coder) registry.getCoder(restrictionType);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8336b24c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 71f7e53..6b98805 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.transforms.reflect;
import com.google.auto.value.AutoValue;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
-import com.google.common.reflect.TypeToken;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Collections;
@@ -342,7 +341,7 @@ public abstract class DoFnSignature {
/** Concrete type of the {@link RestrictionTracker} parameter, if present. */
@Nullable
- abstract TypeToken<?> trackerT();
+ abstract TypeDescriptor<?> trackerT();
/** Whether this {@link DoFn} returns a {@link ProcessContinuation} or void. */
public abstract boolean hasReturnValue();
@@ -350,7 +349,7 @@ public abstract class DoFnSignature {
static ProcessElementMethod create(
Method targetMethod,
List<Parameter> extraParameters,
- TypeToken<?> trackerT,
+ TypeDescriptor<?> trackerT,
boolean hasReturnValue) {
return new AutoValue_DoFnSignature_ProcessElementMethod(
targetMethod, Collections.unmodifiableList(extraParameters), trackerT, hasReturnValue);
@@ -462,9 +461,9 @@ public abstract class DoFnSignature {
public abstract Method targetMethod();
/** Type of the returned restriction. */
- abstract TypeToken<?> restrictionT();
+ abstract TypeDescriptor<?> restrictionT();
- static GetInitialRestrictionMethod create(Method targetMethod, TypeToken<?> restrictionT) {
+ static GetInitialRestrictionMethod create(Method targetMethod, TypeDescriptor<?> restrictionT) {
return new AutoValue_DoFnSignature_GetInitialRestrictionMethod(targetMethod, restrictionT);
}
}
@@ -477,9 +476,9 @@ public abstract class DoFnSignature {
public abstract Method targetMethod();
/** Type of the restriction taken and returned. */
- abstract TypeToken<?> restrictionT();
+ abstract TypeDescriptor<?> restrictionT();
- static SplitRestrictionMethod create(Method targetMethod, TypeToken<?> restrictionT) {
+ static SplitRestrictionMethod create(Method targetMethod, TypeDescriptor<?> restrictionT) {
return new AutoValue_DoFnSignature_SplitRestrictionMethod(targetMethod, restrictionT);
}
}
@@ -492,13 +491,13 @@ public abstract class DoFnSignature {
public abstract Method targetMethod();
/** Type of the input restriction. */
- abstract TypeToken<?> restrictionT();
+ abstract TypeDescriptor<?> restrictionT();
/** Type of the returned {@link RestrictionTracker}. */
- abstract TypeToken<?> trackerT();
+ abstract TypeDescriptor<?> trackerT();
static NewTrackerMethod create(
- Method targetMethod, TypeToken<?> restrictionT, TypeToken<?> trackerT) {
+ Method targetMethod, TypeDescriptor<?> restrictionT, TypeDescriptor<?> trackerT) {
return new AutoValue_DoFnSignature_NewTrackerMethod(targetMethod, restrictionT, trackerT);
}
}
@@ -511,9 +510,9 @@ public abstract class DoFnSignature {
public abstract Method targetMethod();
/** Type of the returned {@link Coder}. */
- abstract TypeToken<?> coderT();
+ abstract TypeDescriptor<?> coderT();
- static GetRestrictionCoderMethod create(Method targetMethod, TypeToken<?> coderT) {
+ static GetRestrictionCoderMethod create(Method targetMethod, TypeDescriptor<?> coderT) {
return new AutoValue_DoFnSignature_GetRestrictionCoderMethod(targetMethod, coderT);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8336b24c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 5814c0e..c690ace 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -22,8 +22,6 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
-import com.google.common.reflect.TypeParameter;
-import com.google.common.reflect.TypeToken;
import java.lang.annotation.Annotation;
import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Field;
@@ -57,6 +55,7 @@ import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.util.state.StateSpec;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
/**
* Parses a {@link DoFn} and computes its {@link DoFnSignature}. See {@link #getSignature}.
@@ -90,18 +89,18 @@ public class DoFnSignatures {
errors.checkArgument(DoFn.class.isAssignableFrom(fnClass), "Must be subtype of DoFn");
builder.setFnClass(fnClass);
- TypeToken<? extends DoFn<?, ?>> fnToken = TypeToken.of(fnClass);
+ TypeDescriptor<? extends DoFn<?, ?>> fnT = TypeDescriptor.of(fnClass);
// Extract the input and output type, and whether the fn is bounded.
- TypeToken<?> inputT = null;
- TypeToken<?> outputT = null;
- for (TypeToken<?> supertype : fnToken.getTypes()) {
+ TypeDescriptor<?> inputT = null;
+ TypeDescriptor<?> outputT = null;
+ for (TypeDescriptor<?> supertype : fnT.getTypes()) {
if (!supertype.getRawType().equals(DoFn.class)) {
continue;
}
Type[] args = ((ParameterizedType) supertype.getType()).getActualTypeArguments();
- inputT = TypeToken.of(args[0]);
- outputT = TypeToken.of(args[1]);
+ inputT = TypeDescriptor.of(args[0]);
+ outputT = TypeDescriptor.of(args[1]);
}
errors.checkNotNull(inputT, "Unable to determine input type");
@@ -169,7 +168,7 @@ public class DoFnSignatures {
DoFnSignature.ProcessElementMethod processElement =
analyzeProcessElementMethod(
processElementErrors,
- fnToken,
+ fnT,
processElementMethod,
inputT,
outputT,
@@ -180,14 +179,14 @@ public class DoFnSignatures {
if (startBundleMethod != null) {
ErrorReporter startBundleErrors = errors.forMethod(DoFn.StartBundle.class, startBundleMethod);
builder.setStartBundle(
- analyzeBundleMethod(startBundleErrors, fnToken, startBundleMethod, inputT, outputT));
+ analyzeBundleMethod(startBundleErrors, fnT, startBundleMethod, inputT, outputT));
}
if (finishBundleMethod != null) {
ErrorReporter finishBundleErrors =
errors.forMethod(DoFn.FinishBundle.class, finishBundleMethod);
builder.setFinishBundle(
- analyzeBundleMethod(finishBundleErrors, fnToken, finishBundleMethod, inputT, outputT));
+ analyzeBundleMethod(finishBundleErrors, fnT, finishBundleMethod, inputT, outputT));
}
if (setupMethod != null) {
@@ -209,7 +208,7 @@ public class DoFnSignatures {
builder.setGetInitialRestriction(
getInitialRestriction =
analyzeGetInitialRestrictionMethod(
- getInitialRestrictionErrors, fnToken, getInitialRestrictionMethod, inputT));
+ getInitialRestrictionErrors, fnT, getInitialRestrictionMethod, inputT));
}
DoFnSignature.SplitRestrictionMethod splitRestriction = null;
@@ -219,7 +218,7 @@ public class DoFnSignatures {
builder.setSplitRestriction(
splitRestriction =
analyzeSplitRestrictionMethod(
- splitRestrictionErrors, fnToken, splitRestrictionMethod, inputT));
+ splitRestrictionErrors, fnT, splitRestrictionMethod, inputT));
}
DoFnSignature.GetRestrictionCoderMethod getRestrictionCoder = null;
@@ -229,17 +228,17 @@ public class DoFnSignatures {
builder.setGetRestrictionCoder(
getRestrictionCoder =
analyzeGetRestrictionCoderMethod(
- getRestrictionCoderErrors, fnToken, getRestrictionCoderMethod));
+ getRestrictionCoderErrors, fnT, getRestrictionCoderMethod));
}
DoFnSignature.NewTrackerMethod newTracker = null;
if (newTrackerMethod != null) {
ErrorReporter newTrackerErrors = errors.forMethod(DoFn.NewTracker.class, newTrackerMethod);
builder.setNewTracker(
- newTracker = analyzeNewTrackerMethod(newTrackerErrors, fnToken, newTrackerMethod));
+ newTracker = analyzeNewTrackerMethod(newTrackerErrors, fnT, newTrackerMethod));
}
- builder.setIsBoundedPerElement(inferBoundedness(fnToken, processElement, errors));
+ builder.setIsBoundedPerElement(inferBoundedness(fnT, processElement, errors));
DoFnSignature signature = builder.build();
@@ -271,11 +270,11 @@ public class DoFnSignatures {
* </ol>
*/
private static PCollection.IsBounded inferBoundedness(
- TypeToken<? extends DoFn> fnToken,
+ TypeDescriptor<? extends DoFn> fnT,
DoFnSignature.ProcessElementMethod processElement,
ErrorReporter errors) {
PCollection.IsBounded isBounded = null;
- for (TypeToken<?> supertype : fnToken.getTypes()) {
+ for (TypeDescriptor<?> supertype : fnT.getTypes()) {
if (supertype.getRawType().isAnnotationPresent(DoFn.BoundedPerElement.class)
|| supertype.getRawType().isAnnotationPresent(DoFn.UnboundedPerElement.class)) {
errors.checkArgument(
@@ -354,7 +353,7 @@ public class DoFnSignatures {
ErrorReporter getInitialRestrictionErrors =
errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestriction.targetMethod());
- TypeToken<?> restrictionT = getInitialRestriction.restrictionT();
+ TypeDescriptor<?> restrictionT = getInitialRestriction.restrictionT();
getInitialRestrictionErrors.checkArgument(
restrictionT.equals(newTracker.restrictionT()),
@@ -411,49 +410,54 @@ public class DoFnSignatures {
}
/**
- * Generates a type token for {@code DoFn<InputT, OutputT>.ProcessContext} given {@code InputT}
- * and {@code OutputT}.
+ * Generates a {@link TypeDescriptor} for {@code DoFn<InputT, OutputT>.ProcessContext} given
+ * {@code InputT} and {@code OutputT}.
*/
private static <InputT, OutputT>
- TypeToken<DoFn<InputT, OutputT>.ProcessContext> doFnProcessContextTypeOf(
- TypeToken<InputT> inputT, TypeToken<OutputT> outputT) {
- return new TypeToken<DoFn<InputT, OutputT>.ProcessContext>() {}.where(
+ TypeDescriptor<DoFn<InputT, OutputT>.ProcessContext> doFnProcessContextTypeOf(
+ TypeDescriptor<InputT> inputT, TypeDescriptor<OutputT> outputT) {
+ return new TypeDescriptor<DoFn<InputT, OutputT>.ProcessContext>() {}.where(
new TypeParameter<InputT>() {}, inputT)
.where(new TypeParameter<OutputT>() {}, outputT);
}
/**
- * Generates a type token for {@code DoFn<InputT, OutputT>.Context} given {@code InputT} and
- * {@code OutputT}.
+ * Generates a {@link TypeDescriptor} for {@code DoFn<InputT, OutputT>.Context} given {@code
+ * InputT} and {@code OutputT}.
*/
- private static <InputT, OutputT> TypeToken<DoFn<InputT, OutputT>.Context> doFnContextTypeOf(
- TypeToken<InputT> inputT, TypeToken<OutputT> outputT) {
- return new TypeToken<DoFn<InputT, OutputT>.Context>() {}.where(
+ private static <InputT, OutputT> TypeDescriptor<DoFn<InputT, OutputT>.Context> doFnContextTypeOf(
+ TypeDescriptor<InputT> inputT, TypeDescriptor<OutputT> outputT) {
+ return new TypeDescriptor<DoFn<InputT, OutputT>.Context>() {}.where(
new TypeParameter<InputT>() {}, inputT)
.where(new TypeParameter<OutputT>() {}, outputT);
}
- /** Generates a type token for {@code DoFn.InputProvider<InputT>} given {@code InputT}. */
- private static <InputT> TypeToken<DoFn.InputProvider<InputT>> inputProviderTypeOf(
- TypeToken<InputT> inputT) {
- return new TypeToken<DoFn.InputProvider<InputT>>() {}.where(
+ /**
+ * Generates a {@link TypeDescriptor} for {@code DoFn.InputProvider<InputT>} given {@code InputT}.
+ */
+ private static <InputT> TypeDescriptor<DoFn.InputProvider<InputT>> inputProviderTypeOf(
+ TypeDescriptor<InputT> inputT) {
+ return new TypeDescriptor<DoFn.InputProvider<InputT>>() {}.where(
new TypeParameter<InputT>() {}, inputT);
}
- /** Generates a type token for {@code DoFn.OutputReceiver<OutputT>} given {@code OutputT}. */
- private static <OutputT> TypeToken<DoFn.OutputReceiver<OutputT>> outputReceiverTypeOf(
- TypeToken<OutputT> inputT) {
- return new TypeToken<DoFn.OutputReceiver<OutputT>>() {}.where(
+ /**
+ * Generates a {@link TypeDescriptor} for {@code DoFn.OutputReceiver<OutputT>} given {@code
+ * OutputT}.
+ */
+ private static <OutputT> TypeDescriptor<DoFn.OutputReceiver<OutputT>> outputReceiverTypeOf(
+ TypeDescriptor<OutputT> inputT) {
+ return new TypeDescriptor<DoFn.OutputReceiver<OutputT>>() {}.where(
new TypeParameter<OutputT>() {}, inputT);
}
@VisibleForTesting
static DoFnSignature.ProcessElementMethod analyzeProcessElementMethod(
ErrorReporter errors,
- TypeToken<? extends DoFn<?, ?>> fnClass,
+ TypeDescriptor<? extends DoFn<?, ?>> fnClass,
Method m,
- TypeToken<?> inputT,
- TypeToken<?> outputT,
+ TypeDescriptor<?> inputT,
+ TypeDescriptor<?> outputT,
Map<String, StateDeclaration> stateDeclarations,
Map<String, TimerDeclaration> timerDeclarations) {
errors.checkArgument(
@@ -462,27 +466,27 @@ public class DoFnSignatures {
"Must return void or %s",
DoFn.ProcessContinuation.class.getSimpleName());
- TypeToken<?> processContextToken = doFnProcessContextTypeOf(inputT, outputT);
+ TypeDescriptor<?> processContextT = doFnProcessContextTypeOf(inputT, outputT);
Type[] params = m.getGenericParameterTypes();
- TypeToken<?> contextToken = null;
+ TypeDescriptor<?> contextT = null;
if (params.length > 0) {
- contextToken = fnClass.resolveType(params[0]);
+ contextT = fnClass.resolveType(params[0]);
}
errors.checkArgument(
- contextToken != null && contextToken.equals(processContextToken),
+ contextT != null && contextT.equals(processContextT),
"Must take %s as the first argument",
- formatType(processContextToken));
+ formatType(processContextT));
List<DoFnSignature.Parameter> extraParameters = new ArrayList<>();
Map<String, DoFnSignature.Parameter> stateParameters = new HashMap<>();
Map<String, DoFnSignature.Parameter> timerParameters = new HashMap<>();
- TypeToken<?> trackerT = null;
+ TypeDescriptor<?> trackerT = null;
- TypeToken<?> expectedInputProviderT = inputProviderTypeOf(inputT);
- TypeToken<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT);
+ TypeDescriptor<?> expectedInputProviderT = inputProviderTypeOf(inputT);
+ TypeDescriptor<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT);
for (int i = 1; i < params.length; ++i) {
- TypeToken<?> paramT = fnClass.resolveType(params[i]);
+ TypeDescriptor<?> paramT = fnClass.resolveType(params[i]);
Class<?> rawType = paramT.getRawType();
if (rawType.equals(BoundedWindow.class)) {
errors.checkArgument(
@@ -641,8 +645,8 @@ public class DoFnSignatures {
} else {
List<String> allowedParamTypes =
Arrays.asList(
- formatType(new TypeToken<BoundedWindow>() {}),
- formatType(new TypeToken<RestrictionTracker<?>>() {}));
+ formatType(new TypeDescriptor<BoundedWindow>() {}),
+ formatType(new TypeDescriptor<RestrictionTracker<?>>() {}));
errors.throwIllegalArgument(
"%s is not a valid context parameter. Should be one of %s",
formatType(paramT), allowedParamTypes);
@@ -665,17 +669,17 @@ public class DoFnSignatures {
@VisibleForTesting
static DoFnSignature.BundleMethod analyzeBundleMethod(
ErrorReporter errors,
- TypeToken<? extends DoFn<?, ?>> fnToken,
+ TypeDescriptor<? extends DoFn<?, ?>> fnT,
Method m,
- TypeToken<?> inputT,
- TypeToken<?> outputT) {
+ TypeDescriptor<?> inputT,
+ TypeDescriptor<?> outputT) {
errors.checkArgument(void.class.equals(m.getReturnType()), "Must return void");
- TypeToken<?> expectedContextToken = doFnContextTypeOf(inputT, outputT);
+ TypeDescriptor<?> expectedContextT = doFnContextTypeOf(inputT, outputT);
Type[] params = m.getGenericParameterTypes();
errors.checkArgument(
- params.length == 1 && fnToken.resolveType(params[0]).equals(expectedContextToken),
+ params.length == 1 && fnT.resolveType(params[0]).equals(expectedContextT),
"Must take a single argument of type %s",
- formatType(expectedContextToken));
+ formatType(expectedContextT));
return DoFnSignature.BundleMethod.create(m);
}
@@ -688,27 +692,33 @@ public class DoFnSignatures {
@VisibleForTesting
static DoFnSignature.GetInitialRestrictionMethod analyzeGetInitialRestrictionMethod(
- ErrorReporter errors, TypeToken<? extends DoFn> fnToken, Method m, TypeToken<?> inputT) {
+ ErrorReporter errors,
+ TypeDescriptor<? extends DoFn> fnT,
+ Method m,
+ TypeDescriptor<?> inputT) {
// Method is of the form:
// @GetInitialRestriction
// RestrictionT getInitialRestriction(InputT element);
Type[] params = m.getGenericParameterTypes();
errors.checkArgument(
- params.length == 1 && fnToken.resolveType(params[0]).equals(inputT),
+ params.length == 1 && fnT.resolveType(params[0]).equals(inputT),
"Must take a single argument of type %s",
formatType(inputT));
return DoFnSignature.GetInitialRestrictionMethod.create(
- m, fnToken.resolveType(m.getGenericReturnType()));
+ m, fnT.resolveType(m.getGenericReturnType()));
}
- /** Generates a type token for {@code List<T>} given {@code T}. */
- private static <T> TypeToken<List<T>> listTypeOf(TypeToken<T> elementT) {
- return new TypeToken<List<T>>() {}.where(new TypeParameter<T>() {}, elementT);
+ /** Generates a {@link TypeDescriptor} for {@code List<T>} given {@code T}. */
+ private static <T> TypeDescriptor<List<T>> listTypeOf(TypeDescriptor<T> elementT) {
+ return new TypeDescriptor<List<T>>() {}.where(new TypeParameter<T>() {}, elementT);
}
@VisibleForTesting
static DoFnSignature.SplitRestrictionMethod analyzeSplitRestrictionMethod(
- ErrorReporter errors, TypeToken<? extends DoFn> fnToken, Method m, TypeToken<?> inputT) {
+ ErrorReporter errors,
+ TypeDescriptor<? extends DoFn> fnT,
+ Method m,
+ TypeDescriptor<?> inputT) {
// Method is of the form:
// @SplitRestriction
// void splitRestriction(InputT element, RestrictionT restriction);
@@ -717,13 +727,13 @@ public class DoFnSignatures {
Type[] params = m.getGenericParameterTypes();
errors.checkArgument(params.length == 3, "Must have exactly 3 arguments");
errors.checkArgument(
- fnToken.resolveType(params[0]).equals(inputT),
+ fnT.resolveType(params[0]).equals(inputT),
"First argument must be the element type %s",
formatType(inputT));
- TypeToken<?> restrictionT = fnToken.resolveType(params[1]);
- TypeToken<?> receiverT = fnToken.resolveType(params[2]);
- TypeToken<?> expectedReceiverT = outputReceiverTypeOf(restrictionT);
+ TypeDescriptor<?> restrictionT = fnT.resolveType(params[1]);
+ TypeDescriptor<?> receiverT = fnT.resolveType(params[2]);
+ TypeDescriptor<?> expectedReceiverT = outputReceiverTypeOf(restrictionT);
errors.checkArgument(
receiverT.equals(expectedReceiverT),
"Third argument must be %s, but is %s",
@@ -777,45 +787,46 @@ public class DoFnSignatures {
}
}
- /** Generates a type token for {@code Coder<T>} given {@code T}. */
- private static <T> TypeToken<Coder<T>> coderTypeOf(TypeToken<T> elementT) {
- return new TypeToken<Coder<T>>() {}.where(new TypeParameter<T>() {}, elementT);
+ /** Generates a {@link TypeDescriptor} for {@code Coder<T>} given {@code T}. */
+ private static <T> TypeDescriptor<Coder<T>> coderTypeOf(TypeDescriptor<T> elementT) {
+ return new TypeDescriptor<Coder<T>>() {}.where(new TypeParameter<T>() {}, elementT);
}
@VisibleForTesting
static DoFnSignature.GetRestrictionCoderMethod analyzeGetRestrictionCoderMethod(
- ErrorReporter errors, TypeToken<? extends DoFn> fnToken, Method m) {
+ ErrorReporter errors, TypeDescriptor<? extends DoFn> fnT, Method m) {
errors.checkArgument(m.getParameterTypes().length == 0, "Must have zero arguments");
- TypeToken<?> resT = fnToken.resolveType(m.getGenericReturnType());
+ TypeDescriptor<?> resT = fnT.resolveType(m.getGenericReturnType());
errors.checkArgument(
- resT.isSubtypeOf(TypeToken.of(Coder.class)),
+ resT.isSubtypeOf(TypeDescriptor.of(Coder.class)),
"Must return a Coder, but returns %s",
formatType(resT));
return DoFnSignature.GetRestrictionCoderMethod.create(m, resT);
}
/**
- * Generates a type token for {@code RestrictionTracker<RestrictionT>} given {@code RestrictionT}.
+ * Generates a {@link TypeDescriptor} for {@code RestrictionTracker<RestrictionT>} given {@code
+ * RestrictionT}.
*/
private static <RestrictionT>
- TypeToken<RestrictionTracker<RestrictionT>> restrictionTrackerTypeOf(
- TypeToken<RestrictionT> restrictionT) {
- return new TypeToken<RestrictionTracker<RestrictionT>>() {}.where(
+ TypeDescriptor<RestrictionTracker<RestrictionT>> restrictionTrackerTypeOf(
+ TypeDescriptor<RestrictionT> restrictionT) {
+ return new TypeDescriptor<RestrictionTracker<RestrictionT>>() {}.where(
new TypeParameter<RestrictionT>() {}, restrictionT);
}
@VisibleForTesting
static DoFnSignature.NewTrackerMethod analyzeNewTrackerMethod(
- ErrorReporter errors, TypeToken<? extends DoFn> fnToken, Method m) {
+ ErrorReporter errors, TypeDescriptor<? extends DoFn> fnT, Method m) {
// Method is of the form:
// @NewTracker
// TrackerT newTracker(RestrictionT restriction);
Type[] params = m.getGenericParameterTypes();
errors.checkArgument(params.length == 1, "Must have a single argument");
- TypeToken<?> restrictionT = fnToken.resolveType(params[0]);
- TypeToken<?> trackerT = fnToken.resolveType(m.getGenericReturnType());
- TypeToken<?> expectedTrackerT = restrictionTrackerTypeOf(restrictionT);
+ TypeDescriptor<?> restrictionT = fnT.resolveType(params[0]);
+ TypeDescriptor<?> trackerT = fnT.resolveType(m.getGenericReturnType());
+ TypeDescriptor<?> expectedTrackerT = restrictionTrackerTypeOf(restrictionT);
errors.checkArgument(
trackerT.isSubtypeOf(expectedTrackerT),
"Returns %s, but must return a subtype of %s",
@@ -985,7 +996,7 @@ public class DoFnSignatures {
return ReflectHelpers.METHOD_FORMATTER.apply(method);
}
- private static String formatType(TypeToken<?> t) {
+ private static String formatType(TypeDescriptor<?> t) {
return ReflectHelpers.TYPE_SIMPLE_DESCRIPTION.apply(t.getType());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8336b24c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index 68278c5..573701b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -22,7 +22,6 @@ import static org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.err
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import com.google.common.reflect.TypeToken;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
@@ -34,6 +33,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -403,12 +403,12 @@ public class DoFnSignaturesSplittableDoFnTest {
"Third argument must be OutputReceiver<SomeRestriction>, but is OutputReceiver<String>");
DoFnSignatures.analyzeSplitRestrictionMethod(
errors(),
- TypeToken.of(FakeDoFn.class),
+ TypeDescriptor.of(FakeDoFn.class),
new AnonymousMethod() {
void method(
Integer element, SomeRestriction restriction, DoFn.OutputReceiver<String> receiver) {}
}.getMethod(),
- TypeToken.of(Integer.class));
+ TypeDescriptor.of(Integer.class));
}
@Test
@@ -422,14 +422,14 @@ public class DoFnSignaturesSplittableDoFnTest {
thrown.expectMessage("First argument must be the element type Integer");
DoFnSignatures.analyzeSplitRestrictionMethod(
errors(),
- TypeToken.of(FakeDoFn.class),
+ TypeDescriptor.of(FakeDoFn.class),
new AnonymousMethod() {
void method(
String element,
SomeRestriction restriction,
DoFn.OutputReceiver<SomeRestriction> receiver) {}
}.getMethod(),
- TypeToken.of(Integer.class));
+ TypeDescriptor.of(Integer.class));
}
@Test
@@ -437,7 +437,7 @@ public class DoFnSignaturesSplittableDoFnTest {
thrown.expectMessage("Must have exactly 3 arguments");
DoFnSignatures.analyzeSplitRestrictionMethod(
errors(),
- TypeToken.of(FakeDoFn.class),
+ TypeDescriptor.of(FakeDoFn.class),
new AnonymousMethod() {
private void method(
Integer element,
@@ -445,7 +445,7 @@ public class DoFnSignaturesSplittableDoFnTest {
DoFn.OutputReceiver<SomeRestriction> receiver,
Object extra) {}
}.getMethod(),
- TypeToken.of(Integer.class));
+ TypeDescriptor.of(Integer.class));
}
@Test
@@ -519,7 +519,7 @@ public class DoFnSignaturesSplittableDoFnTest {
thrown.expectMessage("Must have a single argument");
DoFnSignatures.analyzeNewTrackerMethod(
errors(),
- TypeToken.of(FakeDoFn.class),
+ TypeDescriptor.of(FakeDoFn.class),
new AnonymousMethod() {
private SomeRestrictionTracker method(SomeRestriction restriction, Object extra) {
return null;
@@ -533,7 +533,7 @@ public class DoFnSignaturesSplittableDoFnTest {
"Returns SomeRestrictionTracker, but must return a subtype of RestrictionTracker<String>");
DoFnSignatures.analyzeNewTrackerMethod(
errors(),
- TypeToken.of(FakeDoFn.class),
+ TypeDescriptor.of(FakeDoFn.class),
new AnonymousMethod() {
private SomeRestrictionTracker method(String restriction) {
return null;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8336b24c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index fe88c3b..52ecb2a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -25,7 +25,6 @@ import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
-import com.google.common.reflect.TypeToken;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
@@ -66,12 +65,12 @@ public class DoFnSignaturesTest {
DoFnSignatures.analyzeBundleMethod(
errors(),
- TypeToken.of(FakeDoFn.class),
+ TypeDescriptor.of(FakeDoFn.class),
new DoFnSignaturesTestUtils.AnonymousMethod() {
void method(DoFn<Integer, String>.Context c, int n) {}
}.getMethod(),
- TypeToken.of(Integer.class),
- TypeToken.of(String.class));
+ TypeDescriptor.of(Integer.class),
+ TypeDescriptor.of(String.class));
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8336b24c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java
index ce00f2d..49e2ba7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java
@@ -17,11 +17,11 @@
*/
package org.apache.beam.sdk.transforms.reflect;
-import com.google.common.reflect.TypeToken;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.TypeDescriptor;
/** Utilities for use in {@link DoFnSignatures} tests. */
class DoFnSignaturesTestUtils {
@@ -57,10 +57,10 @@ class DoFnSignaturesTestUtils {
throws Exception {
return DoFnSignatures.analyzeProcessElementMethod(
errors(),
- TypeToken.of(FakeDoFn.class),
+ TypeDescriptor.of(FakeDoFn.class),
method.getMethod(),
- TypeToken.of(Integer.class),
- TypeToken.of(String.class),
+ TypeDescriptor.of(Integer.class),
+ TypeDescriptor.of(String.class),
Collections.EMPTY_MAP,
Collections.EMPTY_MAP);
}