You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/11/07 19:59:17 UTC

[27/50] incubator-beam git commit: Switch DoFnSignature, etc, from TypeToken to TypeDescriptor

Switch DoFnSignature, etc, from TypeToken to TypeDescriptor


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8336b24c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8336b24c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8336b24c

Branch: refs/heads/apex-runner
Commit: 8336b24c97c620fa3edb02301299080bda96379a
Parents: d936ed8
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 1 14:48:54 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Nov 3 21:32:53 2016 -0700

----------------------------------------------------------------------
 .../sdk/transforms/reflect/DoFnInvokers.java    |   7 +-
 .../sdk/transforms/reflect/DoFnSignature.java   |  23 ++-
 .../sdk/transforms/reflect/DoFnSignatures.java  | 177 ++++++++++---------
 .../DoFnSignaturesSplittableDoFnTest.java       |  18 +-
 .../transforms/reflect/DoFnSignaturesTest.java  |   7 +-
 .../reflect/DoFnSignaturesTestUtils.java        |   8 +-
 6 files changed, 124 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8336b24c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index dd134b7..c5a23dc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.transforms.reflect;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.common.reflect.TypeToken;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -263,9 +262,9 @@ public class DoFnInvokers {
 
   /** Default implementation of {@link DoFn.GetRestrictionCoder}, for delegation by bytebuddy. */
   public static class DefaultRestrictionCoder {
-    private final TypeToken<?> restrictionType;
+    private final TypeDescriptor<?> restrictionType;
 
-    DefaultRestrictionCoder(TypeToken<?> restrictionType) {
+    DefaultRestrictionCoder(TypeDescriptor<?> restrictionType) {
       this.restrictionType = restrictionType;
     }
 
@@ -273,7 +272,7 @@ public class DoFnInvokers {
     @SuppressWarnings({"unused", "unchecked"})
     public <RestrictionT> Coder<RestrictionT> invokeGetRestrictionCoder(CoderRegistry registry)
         throws CannotProvideCoderException {
-      return (Coder) registry.getCoder(TypeDescriptor.of(restrictionType.getType()));
+      return (Coder) registry.getCoder(restrictionType);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8336b24c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 71f7e53..6b98805 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.transforms.reflect;
 import com.google.auto.value.AutoValue;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
-import com.google.common.reflect.TypeToken;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.Collections;
@@ -342,7 +341,7 @@ public abstract class DoFnSignature {
 
     /** Concrete type of the {@link RestrictionTracker} parameter, if present. */
     @Nullable
-    abstract TypeToken<?> trackerT();
+    abstract TypeDescriptor<?> trackerT();
 
     /** Whether this {@link DoFn} returns a {@link ProcessContinuation} or void. */
     public abstract boolean hasReturnValue();
@@ -350,7 +349,7 @@ public abstract class DoFnSignature {
     static ProcessElementMethod create(
         Method targetMethod,
         List<Parameter> extraParameters,
-        TypeToken<?> trackerT,
+        TypeDescriptor<?> trackerT,
         boolean hasReturnValue) {
       return new AutoValue_DoFnSignature_ProcessElementMethod(
           targetMethod, Collections.unmodifiableList(extraParameters), trackerT, hasReturnValue);
@@ -462,9 +461,9 @@ public abstract class DoFnSignature {
     public abstract Method targetMethod();
 
     /** Type of the returned restriction. */
-    abstract TypeToken<?> restrictionT();
+    abstract TypeDescriptor<?> restrictionT();
 
-    static GetInitialRestrictionMethod create(Method targetMethod, TypeToken<?> restrictionT) {
+    static GetInitialRestrictionMethod create(Method targetMethod, TypeDescriptor<?> restrictionT) {
       return new AutoValue_DoFnSignature_GetInitialRestrictionMethod(targetMethod, restrictionT);
     }
   }
@@ -477,9 +476,9 @@ public abstract class DoFnSignature {
     public abstract Method targetMethod();
 
     /** Type of the restriction taken and returned. */
-    abstract TypeToken<?> restrictionT();
+    abstract TypeDescriptor<?> restrictionT();
 
-    static SplitRestrictionMethod create(Method targetMethod, TypeToken<?> restrictionT) {
+    static SplitRestrictionMethod create(Method targetMethod, TypeDescriptor<?> restrictionT) {
       return new AutoValue_DoFnSignature_SplitRestrictionMethod(targetMethod, restrictionT);
     }
   }
@@ -492,13 +491,13 @@ public abstract class DoFnSignature {
     public abstract Method targetMethod();
 
     /** Type of the input restriction. */
-    abstract TypeToken<?> restrictionT();
+    abstract TypeDescriptor<?> restrictionT();
 
     /** Type of the returned {@link RestrictionTracker}. */
-    abstract TypeToken<?> trackerT();
+    abstract TypeDescriptor<?> trackerT();
 
     static NewTrackerMethod create(
-        Method targetMethod, TypeToken<?> restrictionT, TypeToken<?> trackerT) {
+        Method targetMethod, TypeDescriptor<?> restrictionT, TypeDescriptor<?> trackerT) {
       return new AutoValue_DoFnSignature_NewTrackerMethod(targetMethod, restrictionT, trackerT);
     }
   }
@@ -511,9 +510,9 @@ public abstract class DoFnSignature {
     public abstract Method targetMethod();
 
     /** Type of the returned {@link Coder}. */
-    abstract TypeToken<?> coderT();
+    abstract TypeDescriptor<?> coderT();
 
-    static GetRestrictionCoderMethod create(Method targetMethod, TypeToken<?> coderT) {
+    static GetRestrictionCoderMethod create(Method targetMethod, TypeDescriptor<?> coderT) {
       return new AutoValue_DoFnSignature_GetRestrictionCoderMethod(targetMethod, coderT);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8336b24c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 5814c0e..c690ace 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -22,8 +22,6 @@ import static com.google.common.base.Preconditions.checkState;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
-import com.google.common.reflect.TypeParameter;
-import com.google.common.reflect.TypeToken;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.AnnotatedElement;
 import java.lang.reflect.Field;
@@ -57,6 +55,7 @@ import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
 
 /**
  * Parses a {@link DoFn} and computes its {@link DoFnSignature}. See {@link #getSignature}.
@@ -90,18 +89,18 @@ public class DoFnSignatures {
     errors.checkArgument(DoFn.class.isAssignableFrom(fnClass), "Must be subtype of DoFn");
     builder.setFnClass(fnClass);
 
-    TypeToken<? extends DoFn<?, ?>> fnToken = TypeToken.of(fnClass);
+    TypeDescriptor<? extends DoFn<?, ?>> fnT = TypeDescriptor.of(fnClass);
 
     // Extract the input and output type, and whether the fn is bounded.
-    TypeToken<?> inputT = null;
-    TypeToken<?> outputT = null;
-    for (TypeToken<?> supertype : fnToken.getTypes()) {
+    TypeDescriptor<?> inputT = null;
+    TypeDescriptor<?> outputT = null;
+    for (TypeDescriptor<?> supertype : fnT.getTypes()) {
       if (!supertype.getRawType().equals(DoFn.class)) {
         continue;
       }
       Type[] args = ((ParameterizedType) supertype.getType()).getActualTypeArguments();
-      inputT = TypeToken.of(args[0]);
-      outputT = TypeToken.of(args[1]);
+      inputT = TypeDescriptor.of(args[0]);
+      outputT = TypeDescriptor.of(args[1]);
     }
     errors.checkNotNull(inputT, "Unable to determine input type");
 
@@ -169,7 +168,7 @@ public class DoFnSignatures {
     DoFnSignature.ProcessElementMethod processElement =
         analyzeProcessElementMethod(
             processElementErrors,
-            fnToken,
+            fnT,
             processElementMethod,
             inputT,
             outputT,
@@ -180,14 +179,14 @@ public class DoFnSignatures {
     if (startBundleMethod != null) {
       ErrorReporter startBundleErrors = errors.forMethod(DoFn.StartBundle.class, startBundleMethod);
       builder.setStartBundle(
-          analyzeBundleMethod(startBundleErrors, fnToken, startBundleMethod, inputT, outputT));
+          analyzeBundleMethod(startBundleErrors, fnT, startBundleMethod, inputT, outputT));
     }
 
     if (finishBundleMethod != null) {
       ErrorReporter finishBundleErrors =
           errors.forMethod(DoFn.FinishBundle.class, finishBundleMethod);
       builder.setFinishBundle(
-          analyzeBundleMethod(finishBundleErrors, fnToken, finishBundleMethod, inputT, outputT));
+          analyzeBundleMethod(finishBundleErrors, fnT, finishBundleMethod, inputT, outputT));
     }
 
     if (setupMethod != null) {
@@ -209,7 +208,7 @@ public class DoFnSignatures {
       builder.setGetInitialRestriction(
           getInitialRestriction =
               analyzeGetInitialRestrictionMethod(
-                  getInitialRestrictionErrors, fnToken, getInitialRestrictionMethod, inputT));
+                  getInitialRestrictionErrors, fnT, getInitialRestrictionMethod, inputT));
     }
 
     DoFnSignature.SplitRestrictionMethod splitRestriction = null;
@@ -219,7 +218,7 @@ public class DoFnSignatures {
       builder.setSplitRestriction(
           splitRestriction =
               analyzeSplitRestrictionMethod(
-                  splitRestrictionErrors, fnToken, splitRestrictionMethod, inputT));
+                  splitRestrictionErrors, fnT, splitRestrictionMethod, inputT));
     }
 
     DoFnSignature.GetRestrictionCoderMethod getRestrictionCoder = null;
@@ -229,17 +228,17 @@ public class DoFnSignatures {
       builder.setGetRestrictionCoder(
           getRestrictionCoder =
               analyzeGetRestrictionCoderMethod(
-                  getRestrictionCoderErrors, fnToken, getRestrictionCoderMethod));
+                  getRestrictionCoderErrors, fnT, getRestrictionCoderMethod));
     }
 
     DoFnSignature.NewTrackerMethod newTracker = null;
     if (newTrackerMethod != null) {
       ErrorReporter newTrackerErrors = errors.forMethod(DoFn.NewTracker.class, newTrackerMethod);
       builder.setNewTracker(
-          newTracker = analyzeNewTrackerMethod(newTrackerErrors, fnToken, newTrackerMethod));
+          newTracker = analyzeNewTrackerMethod(newTrackerErrors, fnT, newTrackerMethod));
     }
 
-    builder.setIsBoundedPerElement(inferBoundedness(fnToken, processElement, errors));
+    builder.setIsBoundedPerElement(inferBoundedness(fnT, processElement, errors));
 
     DoFnSignature signature = builder.build();
 
@@ -271,11 +270,11 @@ public class DoFnSignatures {
    * </ol>
    */
   private static PCollection.IsBounded inferBoundedness(
-      TypeToken<? extends DoFn> fnToken,
+      TypeDescriptor<? extends DoFn> fnT,
       DoFnSignature.ProcessElementMethod processElement,
       ErrorReporter errors) {
     PCollection.IsBounded isBounded = null;
-    for (TypeToken<?> supertype : fnToken.getTypes()) {
+    for (TypeDescriptor<?> supertype : fnT.getTypes()) {
       if (supertype.getRawType().isAnnotationPresent(DoFn.BoundedPerElement.class)
           || supertype.getRawType().isAnnotationPresent(DoFn.UnboundedPerElement.class)) {
         errors.checkArgument(
@@ -354,7 +353,7 @@ public class DoFnSignatures {
 
     ErrorReporter getInitialRestrictionErrors =
         errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestriction.targetMethod());
-    TypeToken<?> restrictionT = getInitialRestriction.restrictionT();
+    TypeDescriptor<?> restrictionT = getInitialRestriction.restrictionT();
 
     getInitialRestrictionErrors.checkArgument(
         restrictionT.equals(newTracker.restrictionT()),
@@ -411,49 +410,54 @@ public class DoFnSignatures {
   }
 
   /**
-   * Generates a type token for {@code DoFn<InputT, OutputT>.ProcessContext} given {@code InputT}
-   * and {@code OutputT}.
+   * Generates a {@link TypeDescriptor} for {@code DoFn<InputT, OutputT>.ProcessContext} given
+   * {@code InputT} and {@code OutputT}.
    */
   private static <InputT, OutputT>
-      TypeToken<DoFn<InputT, OutputT>.ProcessContext> doFnProcessContextTypeOf(
-          TypeToken<InputT> inputT, TypeToken<OutputT> outputT) {
-    return new TypeToken<DoFn<InputT, OutputT>.ProcessContext>() {}.where(
+      TypeDescriptor<DoFn<InputT, OutputT>.ProcessContext> doFnProcessContextTypeOf(
+          TypeDescriptor<InputT> inputT, TypeDescriptor<OutputT> outputT) {
+    return new TypeDescriptor<DoFn<InputT, OutputT>.ProcessContext>() {}.where(
             new TypeParameter<InputT>() {}, inputT)
         .where(new TypeParameter<OutputT>() {}, outputT);
   }
 
   /**
-   * Generates a type token for {@code DoFn<InputT, OutputT>.Context} given {@code InputT} and
-   * {@code OutputT}.
+   * Generates a {@link TypeDescriptor} for {@code DoFn<InputT, OutputT>.Context} given {@code
+   * InputT} and {@code OutputT}.
    */
-  private static <InputT, OutputT> TypeToken<DoFn<InputT, OutputT>.Context> doFnContextTypeOf(
-      TypeToken<InputT> inputT, TypeToken<OutputT> outputT) {
-    return new TypeToken<DoFn<InputT, OutputT>.Context>() {}.where(
+  private static <InputT, OutputT> TypeDescriptor<DoFn<InputT, OutputT>.Context> doFnContextTypeOf(
+      TypeDescriptor<InputT> inputT, TypeDescriptor<OutputT> outputT) {
+    return new TypeDescriptor<DoFn<InputT, OutputT>.Context>() {}.where(
             new TypeParameter<InputT>() {}, inputT)
         .where(new TypeParameter<OutputT>() {}, outputT);
   }
 
-  /** Generates a type token for {@code DoFn.InputProvider<InputT>} given {@code InputT}. */
-  private static <InputT> TypeToken<DoFn.InputProvider<InputT>> inputProviderTypeOf(
-      TypeToken<InputT> inputT) {
-    return new TypeToken<DoFn.InputProvider<InputT>>() {}.where(
+  /**
+   * Generates a {@link TypeDescriptor} for {@code DoFn.InputProvider<InputT>} given {@code InputT}.
+   */
+  private static <InputT> TypeDescriptor<DoFn.InputProvider<InputT>> inputProviderTypeOf(
+      TypeDescriptor<InputT> inputT) {
+    return new TypeDescriptor<DoFn.InputProvider<InputT>>() {}.where(
         new TypeParameter<InputT>() {}, inputT);
   }
 
-  /** Generates a type token for {@code DoFn.OutputReceiver<OutputT>} given {@code OutputT}. */
-  private static <OutputT> TypeToken<DoFn.OutputReceiver<OutputT>> outputReceiverTypeOf(
-      TypeToken<OutputT> inputT) {
-    return new TypeToken<DoFn.OutputReceiver<OutputT>>() {}.where(
+  /**
+   * Generates a {@link TypeDescriptor} for {@code DoFn.OutputReceiver<OutputT>} given {@code
+   * OutputT}.
+   */
+  private static <OutputT> TypeDescriptor<DoFn.OutputReceiver<OutputT>> outputReceiverTypeOf(
+      TypeDescriptor<OutputT> inputT) {
+    return new TypeDescriptor<DoFn.OutputReceiver<OutputT>>() {}.where(
         new TypeParameter<OutputT>() {}, inputT);
   }
 
   @VisibleForTesting
   static DoFnSignature.ProcessElementMethod analyzeProcessElementMethod(
       ErrorReporter errors,
-      TypeToken<? extends DoFn<?, ?>> fnClass,
+      TypeDescriptor<? extends DoFn<?, ?>> fnClass,
       Method m,
-      TypeToken<?> inputT,
-      TypeToken<?> outputT,
+      TypeDescriptor<?> inputT,
+      TypeDescriptor<?> outputT,
       Map<String, StateDeclaration> stateDeclarations,
       Map<String, TimerDeclaration> timerDeclarations) {
     errors.checkArgument(
@@ -462,27 +466,27 @@ public class DoFnSignatures {
         "Must return void or %s",
         DoFn.ProcessContinuation.class.getSimpleName());
 
-    TypeToken<?> processContextToken = doFnProcessContextTypeOf(inputT, outputT);
+    TypeDescriptor<?> processContextT = doFnProcessContextTypeOf(inputT, outputT);
 
     Type[] params = m.getGenericParameterTypes();
-    TypeToken<?> contextToken = null;
+    TypeDescriptor<?> contextT = null;
     if (params.length > 0) {
-      contextToken = fnClass.resolveType(params[0]);
+      contextT = fnClass.resolveType(params[0]);
     }
     errors.checkArgument(
-        contextToken != null && contextToken.equals(processContextToken),
+        contextT != null && contextT.equals(processContextT),
         "Must take %s as the first argument",
-        formatType(processContextToken));
+        formatType(processContextT));
 
     List<DoFnSignature.Parameter> extraParameters = new ArrayList<>();
     Map<String, DoFnSignature.Parameter> stateParameters = new HashMap<>();
     Map<String, DoFnSignature.Parameter> timerParameters = new HashMap<>();
-    TypeToken<?> trackerT = null;
+    TypeDescriptor<?> trackerT = null;
 
-    TypeToken<?> expectedInputProviderT = inputProviderTypeOf(inputT);
-    TypeToken<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT);
+    TypeDescriptor<?> expectedInputProviderT = inputProviderTypeOf(inputT);
+    TypeDescriptor<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT);
     for (int i = 1; i < params.length; ++i) {
-      TypeToken<?> paramT = fnClass.resolveType(params[i]);
+      TypeDescriptor<?> paramT = fnClass.resolveType(params[i]);
       Class<?> rawType = paramT.getRawType();
       if (rawType.equals(BoundedWindow.class)) {
         errors.checkArgument(
@@ -641,8 +645,8 @@ public class DoFnSignatures {
       } else {
         List<String> allowedParamTypes =
             Arrays.asList(
-                formatType(new TypeToken<BoundedWindow>() {}),
-                formatType(new TypeToken<RestrictionTracker<?>>() {}));
+                formatType(new TypeDescriptor<BoundedWindow>() {}),
+                formatType(new TypeDescriptor<RestrictionTracker<?>>() {}));
         errors.throwIllegalArgument(
             "%s is not a valid context parameter. Should be one of %s",
             formatType(paramT), allowedParamTypes);
@@ -665,17 +669,17 @@ public class DoFnSignatures {
   @VisibleForTesting
   static DoFnSignature.BundleMethod analyzeBundleMethod(
       ErrorReporter errors,
-      TypeToken<? extends DoFn<?, ?>> fnToken,
+      TypeDescriptor<? extends DoFn<?, ?>> fnT,
       Method m,
-      TypeToken<?> inputT,
-      TypeToken<?> outputT) {
+      TypeDescriptor<?> inputT,
+      TypeDescriptor<?> outputT) {
     errors.checkArgument(void.class.equals(m.getReturnType()), "Must return void");
-    TypeToken<?> expectedContextToken = doFnContextTypeOf(inputT, outputT);
+    TypeDescriptor<?> expectedContextT = doFnContextTypeOf(inputT, outputT);
     Type[] params = m.getGenericParameterTypes();
     errors.checkArgument(
-        params.length == 1 && fnToken.resolveType(params[0]).equals(expectedContextToken),
+        params.length == 1 && fnT.resolveType(params[0]).equals(expectedContextT),
         "Must take a single argument of type %s",
-        formatType(expectedContextToken));
+        formatType(expectedContextT));
     return DoFnSignature.BundleMethod.create(m);
   }
 
@@ -688,27 +692,33 @@ public class DoFnSignatures {
 
   @VisibleForTesting
   static DoFnSignature.GetInitialRestrictionMethod analyzeGetInitialRestrictionMethod(
-      ErrorReporter errors, TypeToken<? extends DoFn> fnToken, Method m, TypeToken<?> inputT) {
+      ErrorReporter errors,
+      TypeDescriptor<? extends DoFn> fnT,
+      Method m,
+      TypeDescriptor<?> inputT) {
     // Method is of the form:
     // @GetInitialRestriction
     // RestrictionT getInitialRestriction(InputT element);
     Type[] params = m.getGenericParameterTypes();
     errors.checkArgument(
-        params.length == 1 && fnToken.resolveType(params[0]).equals(inputT),
+        params.length == 1 && fnT.resolveType(params[0]).equals(inputT),
         "Must take a single argument of type %s",
         formatType(inputT));
     return DoFnSignature.GetInitialRestrictionMethod.create(
-        m, fnToken.resolveType(m.getGenericReturnType()));
+        m, fnT.resolveType(m.getGenericReturnType()));
   }
 
-  /** Generates a type token for {@code List<T>} given {@code T}. */
-  private static <T> TypeToken<List<T>> listTypeOf(TypeToken<T> elementT) {
-    return new TypeToken<List<T>>() {}.where(new TypeParameter<T>() {}, elementT);
+  /** Generates a {@link TypeDescriptor} for {@code List<T>} given {@code T}. */
+  private static <T> TypeDescriptor<List<T>> listTypeOf(TypeDescriptor<T> elementT) {
+    return new TypeDescriptor<List<T>>() {}.where(new TypeParameter<T>() {}, elementT);
   }
 
   @VisibleForTesting
   static DoFnSignature.SplitRestrictionMethod analyzeSplitRestrictionMethod(
-      ErrorReporter errors, TypeToken<? extends DoFn> fnToken, Method m, TypeToken<?> inputT) {
+      ErrorReporter errors,
+      TypeDescriptor<? extends DoFn> fnT,
+      Method m,
+      TypeDescriptor<?> inputT) {
     // Method is of the form:
     // @SplitRestriction
     // void splitRestriction(InputT element, RestrictionT restriction);
@@ -717,13 +727,13 @@ public class DoFnSignatures {
     Type[] params = m.getGenericParameterTypes();
     errors.checkArgument(params.length == 3, "Must have exactly 3 arguments");
     errors.checkArgument(
-        fnToken.resolveType(params[0]).equals(inputT),
+        fnT.resolveType(params[0]).equals(inputT),
         "First argument must be the element type %s",
         formatType(inputT));
 
-    TypeToken<?> restrictionT = fnToken.resolveType(params[1]);
-    TypeToken<?> receiverT = fnToken.resolveType(params[2]);
-    TypeToken<?> expectedReceiverT = outputReceiverTypeOf(restrictionT);
+    TypeDescriptor<?> restrictionT = fnT.resolveType(params[1]);
+    TypeDescriptor<?> receiverT = fnT.resolveType(params[2]);
+    TypeDescriptor<?> expectedReceiverT = outputReceiverTypeOf(restrictionT);
     errors.checkArgument(
         receiverT.equals(expectedReceiverT),
         "Third argument must be %s, but is %s",
@@ -777,45 +787,46 @@ public class DoFnSignatures {
     }
   }
 
-  /** Generates a type token for {@code Coder<T>} given {@code T}. */
-  private static <T> TypeToken<Coder<T>> coderTypeOf(TypeToken<T> elementT) {
-    return new TypeToken<Coder<T>>() {}.where(new TypeParameter<T>() {}, elementT);
+  /** Generates a {@link TypeDescriptor} for {@code Coder<T>} given {@code T}. */
+  private static <T> TypeDescriptor<Coder<T>> coderTypeOf(TypeDescriptor<T> elementT) {
+    return new TypeDescriptor<Coder<T>>() {}.where(new TypeParameter<T>() {}, elementT);
   }
 
   @VisibleForTesting
   static DoFnSignature.GetRestrictionCoderMethod analyzeGetRestrictionCoderMethod(
-      ErrorReporter errors, TypeToken<? extends DoFn> fnToken, Method m) {
+      ErrorReporter errors, TypeDescriptor<? extends DoFn> fnT, Method m) {
     errors.checkArgument(m.getParameterTypes().length == 0, "Must have zero arguments");
-    TypeToken<?> resT = fnToken.resolveType(m.getGenericReturnType());
+    TypeDescriptor<?> resT = fnT.resolveType(m.getGenericReturnType());
     errors.checkArgument(
-        resT.isSubtypeOf(TypeToken.of(Coder.class)),
+        resT.isSubtypeOf(TypeDescriptor.of(Coder.class)),
         "Must return a Coder, but returns %s",
         formatType(resT));
     return DoFnSignature.GetRestrictionCoderMethod.create(m, resT);
   }
 
   /**
-   * Generates a type token for {@code RestrictionTracker<RestrictionT>} given {@code RestrictionT}.
+   * Generates a {@link TypeDescriptor} for {@code RestrictionTracker<RestrictionT>} given {@code
+   * RestrictionT}.
    */
   private static <RestrictionT>
-      TypeToken<RestrictionTracker<RestrictionT>> restrictionTrackerTypeOf(
-          TypeToken<RestrictionT> restrictionT) {
-    return new TypeToken<RestrictionTracker<RestrictionT>>() {}.where(
+      TypeDescriptor<RestrictionTracker<RestrictionT>> restrictionTrackerTypeOf(
+          TypeDescriptor<RestrictionT> restrictionT) {
+    return new TypeDescriptor<RestrictionTracker<RestrictionT>>() {}.where(
         new TypeParameter<RestrictionT>() {}, restrictionT);
   }
 
   @VisibleForTesting
   static DoFnSignature.NewTrackerMethod analyzeNewTrackerMethod(
-      ErrorReporter errors, TypeToken<? extends DoFn> fnToken, Method m) {
+      ErrorReporter errors, TypeDescriptor<? extends DoFn> fnT, Method m) {
     // Method is of the form:
     // @NewTracker
     // TrackerT newTracker(RestrictionT restriction);
     Type[] params = m.getGenericParameterTypes();
     errors.checkArgument(params.length == 1, "Must have a single argument");
 
-    TypeToken<?> restrictionT = fnToken.resolveType(params[0]);
-    TypeToken<?> trackerT = fnToken.resolveType(m.getGenericReturnType());
-    TypeToken<?> expectedTrackerT = restrictionTrackerTypeOf(restrictionT);
+    TypeDescriptor<?> restrictionT = fnT.resolveType(params[0]);
+    TypeDescriptor<?> trackerT = fnT.resolveType(m.getGenericReturnType());
+    TypeDescriptor<?> expectedTrackerT = restrictionTrackerTypeOf(restrictionT);
     errors.checkArgument(
         trackerT.isSubtypeOf(expectedTrackerT),
         "Returns %s, but must return a subtype of %s",
@@ -985,7 +996,7 @@ public class DoFnSignatures {
     return ReflectHelpers.METHOD_FORMATTER.apply(method);
   }
 
-  private static String formatType(TypeToken<?> t) {
+  private static String formatType(TypeDescriptor<?> t) {
     return ReflectHelpers.TYPE_SIMPLE_DESCRIPTION.apply(t.getType());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8336b24c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index 68278c5..573701b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -22,7 +22,6 @@ import static org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.err
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import com.google.common.reflect.TypeToken;
 import java.util.List;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -34,6 +33,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -403,12 +403,12 @@ public class DoFnSignaturesSplittableDoFnTest {
         "Third argument must be OutputReceiver<SomeRestriction>, but is OutputReceiver<String>");
     DoFnSignatures.analyzeSplitRestrictionMethod(
         errors(),
-        TypeToken.of(FakeDoFn.class),
+        TypeDescriptor.of(FakeDoFn.class),
         new AnonymousMethod() {
           void method(
               Integer element, SomeRestriction restriction, DoFn.OutputReceiver<String> receiver) {}
         }.getMethod(),
-        TypeToken.of(Integer.class));
+        TypeDescriptor.of(Integer.class));
   }
 
   @Test
@@ -422,14 +422,14 @@ public class DoFnSignaturesSplittableDoFnTest {
     thrown.expectMessage("First argument must be the element type Integer");
     DoFnSignatures.analyzeSplitRestrictionMethod(
         errors(),
-        TypeToken.of(FakeDoFn.class),
+        TypeDescriptor.of(FakeDoFn.class),
         new AnonymousMethod() {
           void method(
               String element,
               SomeRestriction restriction,
               DoFn.OutputReceiver<SomeRestriction> receiver) {}
         }.getMethod(),
-        TypeToken.of(Integer.class));
+        TypeDescriptor.of(Integer.class));
   }
 
   @Test
@@ -437,7 +437,7 @@ public class DoFnSignaturesSplittableDoFnTest {
     thrown.expectMessage("Must have exactly 3 arguments");
     DoFnSignatures.analyzeSplitRestrictionMethod(
         errors(),
-        TypeToken.of(FakeDoFn.class),
+        TypeDescriptor.of(FakeDoFn.class),
         new AnonymousMethod() {
           private void method(
               Integer element,
@@ -445,7 +445,7 @@ public class DoFnSignaturesSplittableDoFnTest {
               DoFn.OutputReceiver<SomeRestriction> receiver,
               Object extra) {}
         }.getMethod(),
-        TypeToken.of(Integer.class));
+        TypeDescriptor.of(Integer.class));
   }
 
   @Test
@@ -519,7 +519,7 @@ public class DoFnSignaturesSplittableDoFnTest {
     thrown.expectMessage("Must have a single argument");
     DoFnSignatures.analyzeNewTrackerMethod(
         errors(),
-        TypeToken.of(FakeDoFn.class),
+        TypeDescriptor.of(FakeDoFn.class),
         new AnonymousMethod() {
           private SomeRestrictionTracker method(SomeRestriction restriction, Object extra) {
             return null;
@@ -533,7 +533,7 @@ public class DoFnSignaturesSplittableDoFnTest {
         "Returns SomeRestrictionTracker, but must return a subtype of RestrictionTracker<String>");
     DoFnSignatures.analyzeNewTrackerMethod(
         errors(),
-        TypeToken.of(FakeDoFn.class),
+        TypeDescriptor.of(FakeDoFn.class),
         new AnonymousMethod() {
           private SomeRestrictionTracker method(String restriction) {
             return null;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8336b24c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index fe88c3b..52ecb2a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -25,7 +25,6 @@ import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
-import com.google.common.reflect.TypeToken;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
@@ -66,12 +65,12 @@ public class DoFnSignaturesTest {
 
     DoFnSignatures.analyzeBundleMethod(
         errors(),
-        TypeToken.of(FakeDoFn.class),
+        TypeDescriptor.of(FakeDoFn.class),
         new DoFnSignaturesTestUtils.AnonymousMethod() {
           void method(DoFn<Integer, String>.Context c, int n) {}
         }.getMethod(),
-        TypeToken.of(Integer.class),
-        TypeToken.of(String.class));
+        TypeDescriptor.of(Integer.class),
+        TypeDescriptor.of(String.class));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8336b24c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java
index ce00f2d..49e2ba7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTestUtils.java
@@ -17,11 +17,11 @@
  */
 package org.apache.beam.sdk.transforms.reflect;
 
-import com.google.common.reflect.TypeToken;
 import java.lang.reflect.Method;
 import java.util.Collections;
 import java.util.NoSuchElementException;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /** Utilities for use in {@link DoFnSignatures} tests. */
 class DoFnSignaturesTestUtils {
@@ -57,10 +57,10 @@ class DoFnSignaturesTestUtils {
       throws Exception {
     return DoFnSignatures.analyzeProcessElementMethod(
         errors(),
-        TypeToken.of(FakeDoFn.class),
+        TypeDescriptor.of(FakeDoFn.class),
         method.getMethod(),
-        TypeToken.of(Integer.class),
-        TypeToken.of(String.class),
+        TypeDescriptor.of(Integer.class),
+        TypeDescriptor.of(String.class),
         Collections.EMPTY_MAP,
         Collections.EMPTY_MAP);
   }