You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/17 21:08:12 UTC

[04/14] beam git commit: Add RawPTransform.migrate(SdkComponents) for re-serialization

Add RawPTransform.migrate(SdkComponents) for re-serialization


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c14455ef
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c14455ef
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c14455ef

Branch: refs/heads/master
Commit: c14455ef4209a14a62f7b18f604c9673b32d06d7
Parents: 020ef14
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 2 20:28:16 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Tue Oct 17 12:45:11 2017 -0700

----------------------------------------------------------------------
 .../construction/PTransformTranslation.java     | 98 +++++++++-----------
 .../core/construction/PipelineTranslation.java  | 36 ++++---
 .../core/construction/SplittableParDo.java      |  8 ++
 .../core/SplittableParDoViaKeyedWorkItems.java  |  7 ++
 .../beam/runners/direct/DirectGroupByKey.java   | 14 +++
 .../beam/runners/direct/MultiStepCombine.java   | 18 +++-
 .../direct/ParDoMultiOverrideFactory.java       |  7 ++
 .../direct/TestStreamEvaluatorFactory.java      |  7 ++
 .../runners/direct/ViewOverrideFactory.java     |  8 ++
 9 files changed, 129 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index f9e7837..31767a0 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
-import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -66,8 +65,8 @@ public class PTransformTranslation {
   public static final String WRITE_FILES_TRANSFORM_URN = "urn:beam:transform:write_files:0.1";
 
   /**
-   * @deprecated runners should move away from translating `CreatePCollectionView` and treat this
-   * as part of the translation for a `ParDo` side input.
+   * @deprecated runners should move away from translating `CreatePCollectionView` and treat this as
+   *     part of the translation for a `ParDo` side input.
    */
   @Deprecated
   public static final String CREATE_VIEW_TRANSFORM_URN = "urn:beam:transform:create_view:v1";
@@ -85,8 +84,8 @@ public class PTransformTranslation {
       Map<Class<? extends PTransform>, TransformPayloadTranslator> newTranslators =
           (Map) registrar.getTransformPayloadTranslators();
 
-      Set<Class<? extends PTransform>> alreadyRegistered = Sets.intersection(
-          translators.keySet(), newTranslators.keySet());
+      Set<Class<? extends PTransform>> alreadyRegistered =
+          Sets.intersection(translators.keySet(), newTranslators.keySet());
 
       if (!alreadyRegistered.isEmpty()) {
         throw new IllegalArgumentException(
@@ -143,20 +142,13 @@ public class PTransformTranslation {
         DisplayDataTranslation.toProto(DisplayData.from(appliedPTransform.getTransform())));
 
     PTransform<?, ?> transform = appliedPTransform.getTransform();
+
     // A RawPTransform directly vends its payload. Because it will generally be
     // a subclass, we cannot do dictionary lookup in KNOWN_PAYLOAD_TRANSLATORS.
     if (transform instanceof RawPTransform) {
-      RawPTransform<?, ?> rawPTransform = (RawPTransform<?, ?>) transform;
-
-      if (rawPTransform.getUrn() != null) {
-        FunctionSpec.Builder payload = FunctionSpec.newBuilder().setUrn(rawPTransform.getUrn());
-        @Nullable ByteString parameter = rawPTransform.getPayload();
-        if (parameter != null) {
-          payload.setPayload(parameter);
-        }
-        transformBuilder.setSpec(payload);
-      }
-      rawPTransform.registerComponents(components);
+      // The raw transform was parsed in the context of other components; this puts it in the
+      // context of our current serialization
+      transformBuilder.setSpec(((RawPTransform<?, ?>) transform).migrate(components));
     } else if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
       FunctionSpec payload =
           KNOWN_PAYLOAD_TRANSLATORS
@@ -186,9 +178,7 @@ public class PTransformTranslation {
     return tag.getId();
   }
 
-  /**
-   * Returns the URN for the transform if it is known, otherwise {@code null}.
-   */
+  /** Returns the URN for the transform if it is known, otherwise {@code null}. */
   @Nullable
   public static String urnForTransformOrNull(PTransform<?, ?> transform) {
 
@@ -205,9 +195,7 @@ public class PTransformTranslation {
     return translator.getUrn(transform);
   }
 
-  /**
-   * Returns the URN for the transform if it is known, otherwise throws.
-   */
+  /** Returns the URN for the transform if it is known, otherwise throws. */
   public static String urnForTransform(PTransform<?, ?> transform) {
     String urn = urnForTransformOrNull(transform);
     if (urn == null) {
@@ -235,24 +223,48 @@ public class PTransformTranslation {
    * #expand} method since the definition of the transform may be lost. The transform is already
    * fully expanded in the pipeline proto.
    */
-  public abstract static class RawPTransform<
-          InputT extends PInput, OutputT extends POutput>
+  public abstract static class RawPTransform<InputT extends PInput, OutputT extends POutput>
       extends PTransform<InputT, OutputT> {
 
+    /** The URN for this transform, if standardized. */
     @Nullable
-    public abstract String getUrn();
+    public String getUrn() {
+      return getSpec() == null ? null : getSpec().getUrn();
+    }
 
+    /** The payload for this transform, if any. */
     @Nullable
-    public ByteString getPayload() {
-      return null;
+    public abstract FunctionSpec getSpec();
+
+    /**
+     * Build a new payload set in the context of the given {@link SdkComponents}, if applicable.
+     *
+     * <p>When re-serializing this transform, the ids reference in the rehydrated payload may
+     * conflict with those defined by the serialization context. In that case, the components must
+     * be re-registered and a new payload returned.
+     */
+    public FunctionSpec migrate(SdkComponents components) throws IOException {
+      return getSpec();
     }
 
-    public void registerComponents(SdkComponents components) {}
+    /**
+     * By default, throws an exception, but can be overridden.
+     *
+     * <p>It is permissible for runner-specific transforms to be both a {@link RawPTransform} that
+     * directly vends its proto representation and also to expand, for convenience of not having to
+     * register a translator.
+     */
+    @Override
+    public OutputT expand(InputT input) {
+      throw new IllegalStateException(
+          String.format(
+              "%s should never be asked to expand;"
+                  + " it is the result of deserializing an already-constructed Pipeline",
+              getClass().getSimpleName()));
+    }
   }
 
-  /**
-   * A translator that uses the explicit URN and payload from a {@link RawPTransform}.
-   */
+  /** A translator that uses the explicit URN and payload from a {@link RawPTransform}. */
   public static class RawPTransformTranslator
       implements TransformPayloadTranslator<RawPTransform<?, ?>> {
     @Override
@@ -262,27 +274,9 @@ public class PTransformTranslation {
 
     @Override
     public FunctionSpec translate(
-        AppliedPTransform<?, ?, RawPTransform<?, ?>> transform,
-        SdkComponents components) {
-
-      // Anonymous composites have no spec
-      if (transform.getTransform().getUrn() == null) {
-        return null;
-      }
-
-      FunctionSpec.Builder transformSpec =
-          FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform()));
-
-      ByteString payload = transform.getTransform().getPayload();
-      if (payload != null) {
-        transformSpec.setPayload(payload);
-      }
-
-      // Transforms like Combine may have Coders that need to be added but do not
-      // occur in a black-box traversal
-      transform.getTransform().registerComponents(components);
-
-      return transformSpec.build();
+        AppliedPTransform<?, ?, RawPTransform<?, ?>> transform, SdkComponents components)
+        throws IOException {
+      return transform.getTransform().migrate(components);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
index 0aca837..1624865 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -24,7 +24,6 @@ import com.google.auto.value.AutoValue;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
-import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -105,8 +104,7 @@ public class PipelineTranslation {
     return DisplayData.from(component);
   }
 
-  public static Pipeline fromProto(final RunnerApi.Pipeline pipelineProto)
-      throws IOException {
+  public static Pipeline fromProto(final RunnerApi.Pipeline pipelineProto) throws IOException {
     TransformHierarchy transforms = new TransformHierarchy();
     Pipeline pipeline = Pipeline.forTransformHierarchy(transforms, PipelineOptionsFactory.create());
 
@@ -184,11 +182,7 @@ public class PipelineTranslation {
     }
 
     RehydratedPTransform transform =
-        RehydratedPTransform.of(
-            transformSpec.getUrn(),
-            transformSpec.getPayload(),
-            additionalInputs,
-            additionalCoders);
+        RehydratedPTransform.of(transformSpec, additionalInputs, additionalCoders);
 
     if (isPrimitive(transformProto)) {
       transforms.addFinalizedPrimitiveNode(
@@ -234,32 +228,33 @@ public class PipelineTranslation {
   private static boolean isPrimitive(RunnerApi.PTransform transformProto) {
     return transformProto.getSubtransformsCount() == 0
         && !transformProto
-        .getInputsMap()
-        .values()
-        .containsAll(transformProto.getOutputsMap().values());
+            .getInputsMap()
+            .values()
+            .containsAll(transformProto.getOutputsMap().values());
   }
 
   @AutoValue
   abstract static class RehydratedPTransform extends RawPTransform<PInput, POutput> {
 
     @Nullable
-    public abstract String getUrn();
-
-    @Nullable
-    public abstract ByteString getPayload();
+    public abstract RunnerApi.FunctionSpec getSpec();
 
     @Override
     public abstract Map<TupleTag<?>, PValue> getAdditionalInputs();
 
     public abstract List<Coder<?>> getCoders();
 
+    @Override
+    public String getUrn() {
+      return getSpec().getUrn();
+    }
+
     public static RehydratedPTransform of(
-        String urn,
-        ByteString payload,
+        RunnerApi.FunctionSpec payload,
         Map<TupleTag<?>, PValue> additionalInputs,
         List<Coder<?>> additionalCoders) {
       return new AutoValue_PipelineTranslation_RehydratedPTransform(
-          urn, payload, additionalInputs, additionalCoders);
+          payload, additionalInputs, additionalCoders);
     }
 
     @Override
@@ -275,12 +270,12 @@ public class PipelineTranslation {
     public String toString() {
       return MoreObjects.toStringHelper(this)
           .add("urn", getUrn())
-          .add("payload", getPayload())
+          .add("payload", getSpec())
           .toString();
     }
 
     @Override
-    public void registerComponents(SdkComponents components) {
+    public RunnerApi.FunctionSpec migrate(SdkComponents components) {
       for (Coder<?> coder : getCoders()) {
         try {
           components.registerCoder(coder);
@@ -288,6 +283,7 @@ public class PipelineTranslation {
           throw new RuntimeException(e);
         }
       }
+      return getSpec();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index 32d3409..ab66e84 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
@@ -295,6 +297,12 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     public String getUrn() {
       return SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN;
     }
+
+    @Nullable
+    @Override
+    public RunnerApi.FunctionSpec getSpec() {
+      return null;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 251260e..400df19 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
@@ -87,6 +88,12 @@ public class SplittableParDoViaKeyedWorkItems {
     public String getUrn() {
       return SplittableParDo.SPLITTABLE_GBKIKWI_URN;
     }
+
+    @Override
+    public RunnerApi.FunctionSpec getSpec() {
+      throw new UnsupportedOperationException(
+          String.format("%s should never be serialized to proto", getClass().getSimpleName()));
+    }
   }
 
   /** Overrides a {@link ProcessKeyedElements} into {@link SplittableProcessViaKeyedWorkItems}. */

http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 3ba04e7..9e56b65 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -20,6 +20,8 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.construction.ForwardingPTransform;
@@ -92,6 +94,12 @@ class DirectGroupByKey<K, V>
     public String getUrn() {
       return DIRECT_GBKO_URN;
     }
+
+    @Nullable
+    @Override
+    public RunnerApi.FunctionSpec getSpec() {
+      return null;
+    }
   }
 
   static final class DirectGroupAlsoByWindow<K, V>
@@ -141,5 +149,11 @@ class DirectGroupByKey<K, V>
     public String getUrn() {
       return DIRECT_GABW_URN;
     }
+
+    @Nullable
+    @Override
+    public RunnerApi.FunctionSpec getSpec() {
+      return null;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
index ae21b4d..5253ef5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
@@ -28,7 +28,9 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.CombineTranslation;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
@@ -177,12 +179,18 @@ class MultiStepCombine<K, InputT, AccumT, OutputT>
     this.outputCoder = outputCoder;
   }
 
-  @Nullable
+  @Nonnull
   @Override
   public String getUrn() {
     return "urn:beam:directrunner:transforms:multistepcombine:v1";
   }
 
+  @Nullable
+  @Override
+  public RunnerApi.FunctionSpec getSpec() {
+    return null;
+  }
+
   @Override
   public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
     checkArgument(
@@ -337,11 +345,17 @@ class MultiStepCombine<K, InputT, AccumT, OutputT>
           input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), outputCoder);
     }
 
-    @Nullable
+    @Nonnull
     @Override
     public String getUrn() {
       return DIRECT_MERGE_ACCUMULATORS_EXTRACT_OUTPUT_URN;
     }
+
+    @Nullable
+    @Override
+    public RunnerApi.FunctionSpec getSpec() {
+      return null;
+    }
   }
 
   static class MergeAndExtractAccumulatorOutputEvaluatorFactory

http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 26f30b0..5ec52be 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.KeyedWorkItems;
@@ -261,6 +262,12 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
     public String getUrn() {
       return DIRECT_STATEFUL_PAR_DO_URN;
     }
+
+    @Override
+    public RunnerApi.FunctionSpec getSpec() {
+      throw new UnsupportedOperationException(
+          String.format("%s should never be serialized to proto", getClass().getSimpleName()));
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 49e7be7..d62b64c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.TestStreamTranslation;
@@ -218,6 +219,12 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
       public String getUrn() {
         return DIRECT_TEST_STREAM_URN;
       }
+
+      @Nullable
+      @Override
+      public RunnerApi.FunctionSpec getSpec() {
+        return null;
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index c2255fe..61b7978 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -20,6 +20,8 @@ package org.apache.beam.runners.direct;
 
 import java.io.IOException;
 import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
@@ -128,6 +130,12 @@ class ViewOverrideFactory<ElemT, ViewT>
     public String getUrn() {
       return DIRECT_WRITE_VIEW_URN;
     }
+
+    @Nullable
+    @Override
+    public RunnerApi.FunctionSpec getSpec() {
+      return null;
+    }
   }
 
   public static final String DIRECT_WRITE_VIEW_URN =