You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mw...@apache.org on 2020/04/20 11:15:30 UTC

[beam] 01/01: [BEAM-9147] Make VideoIntelligence use PTransform on user-facing API

This is an automated email from the ASF dual-hosted git repository.

mwalenia pushed a commit to branch BEAM-9147-use-ptransforms
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 31117953cb7ca7728099be0d1a1c7410dbc14062
Author: Michal Walenia <mi...@polidea.com>
AuthorDate: Mon Apr 20 13:14:48 2020 +0200

    [BEAM-9147] Make VideoIntelligence use PTransform on user-facing API
---
 .../beam/sdk/extensions/ml/AnnotateVideo.java      |  73 +++-----------
 .../ml/AnnotateVideoBytesWithContextFn.java        |  50 ++++++++++
 .../{AnnotateVideo.java => AnnotateVideoFn.java}   |  10 +-
 .../extensions/ml/AnnotateVideoFromBytesFn.java    |  55 +++++++++++
 .../sdk/extensions/ml/AnnotateVideoFromURIFn.java  |  54 ++++++++++
 .../ml/AnnotateVideoURIWithContextFn.java          |  49 +++++++++
 .../beam/sdk/extensions/ml/VideoIntelligence.java  | 109 +++++++++------------
 .../beam/sdk/extensions/ml/package-info.java       |   3 +
 .../beam/sdk/extensions/ml/AnnotateVideoTest.java  |  16 ++-
 .../sdk/extensions/ml/VideoIntelligenceIT.java     |   4 +-
 10 files changed, 285 insertions(+), 138 deletions(-)

diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java
index 56e8638..3cedb62 100644
--- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java
@@ -17,87 +17,44 @@
  */
 package org.apache.beam.sdk.extensions.ml;
 
-import com.google.api.gax.longrunning.OperationFuture;
-import com.google.cloud.videointelligence.v1.AnnotateVideoProgress;
-import com.google.cloud.videointelligence.v1.AnnotateVideoRequest;
-import com.google.cloud.videointelligence.v1.AnnotateVideoResponse;
 import com.google.cloud.videointelligence.v1.Feature;
 import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
 import com.google.cloud.videointelligence.v1.VideoContext;
-import com.google.cloud.videointelligence.v1.VideoIntelligenceServiceClient;
-import com.google.protobuf.ByteString;
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 
 /**
- * Base class for Video Intelligence transform.
+ * Base class for VideoIntelligence PTransform.
  *
- * @param <T> Class of input data being passed in - either ByteString - video data encoded into.
- *     String or String - a GCS URI of the video to be annotated.
+ * @param <T> Type of input PCollection contents.
  */
-public abstract class AnnotateVideo<T> extends DoFn<T, List<VideoAnnotationResults>> {
-
+@Experimental
+public abstract class AnnotateVideo<T>
+    extends PTransform<PCollection<T>, PCollection<List<VideoAnnotationResults>>> {
   protected final PCollectionView<Map<T, VideoContext>> contextSideInput;
   protected final List<Feature> featureList;
-  VideoIntelligenceServiceClient videoIntelligenceServiceClient;
 
-  public AnnotateVideo(
+  protected AnnotateVideo(
       PCollectionView<Map<T, VideoContext>> contextSideInput, List<Feature> featureList) {
     this.contextSideInput = contextSideInput;
     this.featureList = featureList;
   }
 
-  public AnnotateVideo(List<Feature> featureList) {
-    contextSideInput = null;
+  protected AnnotateVideo(List<Feature> featureList) {
+    this.contextSideInput = null;
     this.featureList = featureList;
   }
 
-  @StartBundle
-  public void startBundle() throws IOException {
-    videoIntelligenceServiceClient = VideoIntelligenceServiceClient.create();
-  }
-
-  @Teardown
-  public void teardown() {
-    videoIntelligenceServiceClient.close();
-  }
-
   /**
-   * Call the Video Intelligence Cloud AI service and return annotation results.
+   * To be implemented based on input PCollection's content type.
    *
-   * @param elementURI This or elementContents is required. GCS address of video to be annotated
-   * @param elementContents this or elementURI is required. Hex-encoded contents of video to be
-   *     annotated
-   * @param videoContext Optional context for video annotation.
+   * @param input
    * @return
    */
-  List<VideoAnnotationResults> getVideoAnnotationResults(
-      String elementURI, ByteString elementContents, VideoContext videoContext)
-      throws InterruptedException, ExecutionException {
-    AnnotateVideoRequest.Builder requestBuilder =
-        AnnotateVideoRequest.newBuilder().addAllFeatures(featureList);
-    if (elementURI != null) {
-      requestBuilder.setInputUri(elementURI);
-    } else if (elementContents != null) {
-      requestBuilder.setInputContent(elementContents);
-    } else {
-      throw new IllegalArgumentException("Either elementURI or elementContents should be non-null");
-    }
-    if (videoContext != null) {
-      requestBuilder.setVideoContext(videoContext);
-    }
-    AnnotateVideoRequest annotateVideoRequest = requestBuilder.build();
-    OperationFuture<AnnotateVideoResponse, AnnotateVideoProgress> annotateVideoAsync =
-        videoIntelligenceServiceClient.annotateVideoAsync(annotateVideoRequest);
-    return annotateVideoAsync.get().getAnnotationResultsList();
-  }
-
-  /** Process element implementation required. */
-  @ProcessElement
-  public abstract void processElement(ProcessContext context)
-      throws ExecutionException, InterruptedException;
+  @Override
+  public abstract PCollection<List<VideoAnnotationResults>> expand(PCollection<T> input);
 }
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoBytesWithContextFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoBytesWithContextFn.java
new file mode 100644
index 0000000..c8edcd1
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoBytesWithContextFn.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.cloud.videointelligence.v1.Feature;
+import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
+import com.google.cloud.videointelligence.v1.VideoContext;
+import com.google.protobuf.ByteString;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Implementation of AnnotateVideoFn accepting KVs as contents of input PCollection. Keys are the
+ * ByteString encoded video contents, values - VideoContext objects.
+ */
+@Experimental
+class AnnotateVideoBytesWithContextFn extends AnnotateVideoFn<KV<ByteString, VideoContext>> {
+
+  public AnnotateVideoBytesWithContextFn(List<Feature> featureList) {
+    super(featureList);
+  }
+
+  /** ProcessElement implementation. */
+  @Override
+  public void processElement(ProcessContext context)
+      throws ExecutionException, InterruptedException {
+    ByteString element = context.element().getKey();
+    VideoContext videoContext = context.element().getValue();
+    List<VideoAnnotationResults> videoAnnotationResults =
+        getVideoAnnotationResults(null, element, videoContext);
+    context.output(videoAnnotationResults);
+  }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFn.java
similarity index 93%
copy from sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java
copy to sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFn.java
index 56e8638..3cf0c30 100644
--- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFn.java
@@ -30,28 +30,30 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.PCollectionView;
 
 /**
- * Base class for Video Intelligence transform.
+ * Base class for DoFns used in VideoIntelligence transforms.
  *
  * @param <T> Class of input data being passed in - either ByteString - video data encoded into.
  *     String or String - a GCS URI of the video to be annotated.
  */
-public abstract class AnnotateVideo<T> extends DoFn<T, List<VideoAnnotationResults>> {
+@Experimental
+abstract class AnnotateVideoFn<T> extends DoFn<T, List<VideoAnnotationResults>> {
 
   protected final PCollectionView<Map<T, VideoContext>> contextSideInput;
   protected final List<Feature> featureList;
   VideoIntelligenceServiceClient videoIntelligenceServiceClient;
 
-  public AnnotateVideo(
+  public AnnotateVideoFn(
       PCollectionView<Map<T, VideoContext>> contextSideInput, List<Feature> featureList) {
     this.contextSideInput = contextSideInput;
     this.featureList = featureList;
   }
 
-  public AnnotateVideo(List<Feature> featureList) {
+  public AnnotateVideoFn(List<Feature> featureList) {
     contextSideInput = null;
     this.featureList = featureList;
   }
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromBytesFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromBytesFn.java
new file mode 100644
index 0000000..20aa083
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromBytesFn.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.cloud.videointelligence.v1.Feature;
+import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
+import com.google.cloud.videointelligence.v1.VideoContext;
+import com.google.protobuf.ByteString;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Implementation of AnnotateVideoFn accepting ByteStrings as contents of input PCollection. Videos
+ * decoded from the ByteStrings are annotated.
+ */
+@Experimental
+class AnnotateVideoFromBytesFn extends AnnotateVideoFn<ByteString> {
+
+  public AnnotateVideoFromBytesFn(
+      PCollectionView<Map<ByteString, VideoContext>> contextSideInput, List<Feature> featureList) {
+    super(contextSideInput, featureList);
+  }
+
+  /** Implementation of ProcessElement. */
+  @Override
+  public void processElement(ProcessContext context)
+      throws ExecutionException, InterruptedException {
+    ByteString element = context.element();
+    VideoContext videoContext = null;
+    if (contextSideInput != null) {
+      videoContext = context.sideInput(contextSideInput).get(element);
+    }
+    List<VideoAnnotationResults> videoAnnotationResults =
+        getVideoAnnotationResults(null, element, videoContext);
+    context.output(videoAnnotationResults);
+  }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromURIFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromURIFn.java
new file mode 100644
index 0000000..5dfea0c
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromURIFn.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.cloud.videointelligence.v1.Feature;
+import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
+import com.google.cloud.videointelligence.v1.VideoContext;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Implementation of AnnotateVideoFn accepting Strings as contents of input PCollection. Annotates
+ * videos found on GCS based on URIs from input PCollection.
+ */
+@Experimental
+class AnnotateVideoFromURIFn extends AnnotateVideoFn<String> {
+
+  public AnnotateVideoFromURIFn(
+      PCollectionView<Map<String, VideoContext>> contextSideInput, List<Feature> featureList) {
+    super(contextSideInput, featureList);
+  }
+
+  /** ProcessElement implementation. */
+  @Override
+  public void processElement(ProcessContext context)
+      throws ExecutionException, InterruptedException {
+    String elementURI = context.element();
+    VideoContext videoContext = null;
+    if (contextSideInput != null) {
+      videoContext = context.sideInput(contextSideInput).get(elementURI);
+    }
+    List<VideoAnnotationResults> annotationResultsList =
+        getVideoAnnotationResults(elementURI, null, videoContext);
+    context.output(annotationResultsList);
+  }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoURIWithContextFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoURIWithContextFn.java
new file mode 100644
index 0000000..a165d5a
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoURIWithContextFn.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.cloud.videointelligence.v1.Feature;
+import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
+import com.google.cloud.videointelligence.v1.VideoContext;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Implementation of AnnotateVideoFn accepting KVs as contents of input PCollection. Keys are the
+ * GCS URIs, values - VideoContext objects.
+ */
+@Experimental
+class AnnotateVideoURIWithContextFn extends AnnotateVideoFn<KV<String, VideoContext>> {
+
+  public AnnotateVideoURIWithContextFn(List<Feature> featureList) {
+    super(featureList);
+  }
+
+  /** ProcessElement implementation. */
+  @Override
+  public void processElement(ProcessContext context)
+      throws ExecutionException, InterruptedException {
+    String elementURI = context.element().getKey();
+    VideoContext videoContext = context.element().getValue();
+    List<VideoAnnotationResults> videoAnnotationResults =
+        getVideoAnnotationResults(elementURI, null, videoContext);
+    context.output(videoAnnotationResults);
+  }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java
index 0f447da..5fa25d6 100644
--- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java
@@ -23,8 +23,10 @@ import com.google.cloud.videointelligence.v1.VideoContext;
 import com.google.protobuf.ByteString;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 
 /**
@@ -37,6 +39,7 @@ import org.apache.beam.sdk.values.PCollectionView;
  *
  * <p>Service account with proper permissions is required to use these transforms.
  */
+@Experimental
 public class VideoIntelligence {
 
   /**
@@ -44,11 +47,11 @@ public class VideoIntelligence {
    *
    * @param featureList List of features to be annotated
    * @param contextSideInput Optional side input with map of contexts to URIs
-   * @return DoFn performing the necessary operations
+   * @return PTransform performing the necessary operations
    */
-  public static AnnotateVideoFromURI annotateFromURI(
+  public static AnnotateVideoFromUri annotateFromURI(
       List<Feature> featureList, PCollectionView<Map<String, VideoContext>> contextSideInput) {
-    return new AnnotateVideoFromURI(contextSideInput, featureList);
+    return new AnnotateVideoFromUri(contextSideInput, featureList);
   }
 
   /**
@@ -56,7 +59,7 @@ public class VideoIntelligence {
    *
    * @param featureList List of features to be annotated
    * @param contextSideInput Optional side input with map of contexts to ByteStrings
-   * @return DoFn performing the necessary operations
+   * @return PTransform performing the necessary operations
    */
   public static AnnotateVideoFromBytes annotateFromBytes(
       PCollectionView<Map<ByteString, VideoContext>> contextSideInput, List<Feature> featureList) {
@@ -67,118 +70,96 @@ public class VideoIntelligence {
    * Annotates videos from key-value pairs of GCS URI and VideoContext.
    *
    * @param featureList List of features to be annotated
-   * @return DoFn performing the necessary operations
+   * @return PTransform performing the necessary operations
    */
-  public static AnnotateVideoURIWithContext annotateFromUriWithContext(List<Feature> featureList) {
-    return new AnnotateVideoURIWithContext(featureList);
+  public static AnnotateVideoFromURIWithContext annotateFromUriWithContext(
+      List<Feature> featureList) {
+    return new AnnotateVideoFromURIWithContext(featureList);
   }
 
   /**
    * Annotates videos from key-value pairs of ByteStrings and VideoContext.
    *
    * @param featureList List of features to be annotated
-   * @return DoFn performing the necessary operations
+   * @return PTransform performing the necessary operations
    */
-  public static AnnotateVideoBytesWithContext annotateFromBytesWithContext(
+  public static AnnotateVideoFromBytesWithContext annotateFromBytesWithContext(
       List<Feature> featureList) {
-    return new AnnotateVideoBytesWithContext(featureList);
+    return new AnnotateVideoFromBytesWithContext(featureList);
   }
 
   /**
-   * Implementation of AnnotateVideo accepting Strings as contents of input PCollection. Annotates
-   * videos found on GCS based on URIs from input PCollection.
+   * Implementation of {@link AnnotateVideo} taking a PCollection of {@link String} and an optional
+   * side input with a context map.
    */
-  public static class AnnotateVideoFromURI extends AnnotateVideo<String> {
+  @Experimental
+  public static class AnnotateVideoFromUri extends AnnotateVideo<String> {
 
-    public AnnotateVideoFromURI(
+    protected AnnotateVideoFromUri(
         PCollectionView<Map<String, VideoContext>> contextSideInput, List<Feature> featureList) {
       super(contextSideInput, featureList);
     }
 
-    /** ProcessElement implementation. */
     @Override
-    public void processElement(ProcessContext context)
-        throws ExecutionException, InterruptedException {
-      String elementURI = context.element();
-      VideoContext videoContext = null;
-      if (contextSideInput != null) {
-        videoContext = context.sideInput(contextSideInput).get(elementURI);
-      }
-      List<VideoAnnotationResults> annotationResultsList =
-          getVideoAnnotationResults(elementURI, null, videoContext);
-      context.output(annotationResultsList);
+    public PCollection<List<VideoAnnotationResults>> expand(PCollection<String> input) {
+      return input.apply(ParDo.of(new AnnotateVideoFromURIFn(contextSideInput, featureList)));
     }
   }
 
   /**
-   * Implementation of AnnotateVideo accepting ByteStrings as contents of input PCollection. Videos
-   * decoded from the ByteStrings are annotated.
+   * Implementation of {@link AnnotateVideo} taking a PCollection of {@link ByteString} and an
+   * optional side input with a context map.
    */
+  @Experimental
   public static class AnnotateVideoFromBytes extends AnnotateVideo<ByteString> {
 
-    public AnnotateVideoFromBytes(
+    protected AnnotateVideoFromBytes(
         PCollectionView<Map<ByteString, VideoContext>> contextSideInput,
         List<Feature> featureList) {
       super(contextSideInput, featureList);
     }
 
-    /** Implementation of ProcessElement. */
     @Override
-    public void processElement(ProcessContext context)
-        throws ExecutionException, InterruptedException {
-      ByteString element = context.element();
-      VideoContext videoContext = null;
-      if (contextSideInput != null) {
-        videoContext = context.sideInput(contextSideInput).get(element);
-      }
-      List<VideoAnnotationResults> videoAnnotationResults =
-          getVideoAnnotationResults(null, element, videoContext);
-      context.output(videoAnnotationResults);
+    public PCollection<List<VideoAnnotationResults>> expand(PCollection<ByteString> input) {
+      return input.apply(ParDo.of(new AnnotateVideoFromBytesFn(contextSideInput, featureList)));
     }
   }
 
   /**
-   * Implementation of AnnotateVideo accepting KVs as contents of input PCollection. Keys are the
-   * GCS URIs, values - VideoContext objects.
+   * Implementation of {@link AnnotateVideo} taking a PCollection of {@link KV} of {@link String}
+   * and {@link VideoContext}.
    */
-  public static class AnnotateVideoURIWithContext extends AnnotateVideo<KV<String, VideoContext>> {
+  @Experimental
+  public static class AnnotateVideoFromURIWithContext
+      extends AnnotateVideo<KV<String, VideoContext>> {
 
-    public AnnotateVideoURIWithContext(List<Feature> featureList) {
+    protected AnnotateVideoFromURIWithContext(List<Feature> featureList) {
       super(featureList);
     }
 
-    /** ProcessElement implementation. */
     @Override
-    public void processElement(ProcessContext context)
-        throws ExecutionException, InterruptedException {
-      String elementURI = context.element().getKey();
-      VideoContext videoContext = context.element().getValue();
-      List<VideoAnnotationResults> videoAnnotationResults =
-          getVideoAnnotationResults(elementURI, null, videoContext);
-      context.output(videoAnnotationResults);
+    public PCollection<List<VideoAnnotationResults>> expand(
+        PCollection<KV<String, VideoContext>> input) {
+      return input.apply(ParDo.of(new AnnotateVideoURIWithContextFn(featureList)));
     }
   }
 
   /**
-   * Implementation of AnnotateVideo accepting KVs as contents of input PCollection. Keys are the
-   * ByteString encoded video contents, values - VideoContext objects.
+   * Implementation of {@link AnnotateVideo} taking a PCollection of {@link KV} of {@link
+   * ByteString} and {@link VideoContext}.
    */
-  public static class AnnotateVideoBytesWithContext
+  @Experimental
+  public static class AnnotateVideoFromBytesWithContext
       extends AnnotateVideo<KV<ByteString, VideoContext>> {
 
-    public AnnotateVideoBytesWithContext(List<Feature> featureList) {
+    protected AnnotateVideoFromBytesWithContext(List<Feature> featureList) {
       super(featureList);
     }
 
-    /** ProcessElement implementation. */
     @Override
-    public void processElement(ProcessContext context)
-        throws ExecutionException, InterruptedException {
-      ByteString element = context.element().getKey();
-      VideoContext videoContext = context.element().getValue();
-      List<VideoAnnotationResults> videoAnnotationResults =
-          getVideoAnnotationResults(null, element, videoContext);
-      context.output(videoAnnotationResults);
+    public PCollection<List<VideoAnnotationResults>> expand(
+        PCollection<KV<ByteString, VideoContext>> input) {
+      return input.apply(ParDo.of(new AnnotateVideoBytesWithContextFn(featureList)));
     }
   }
 }
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/package-info.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/package-info.java
index ad5216d..241d902 100644
--- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/package-info.java
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/package-info.java
@@ -16,4 +16,7 @@
  * limitations under the License.
  */
 /** Provides DoFns for integration with Google Cloud AI Video Intelligence service. */
+@Experimental
 package org.apache.beam.sdk.extensions.ml;
+
+import org.apache.beam.sdk.annotations.Experimental;
diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java
index 57400e4..56473aa 100644
--- a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java
+++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java
@@ -52,22 +52,20 @@ public class AnnotateVideoTest {
         .thenReturn(Collections.singletonList(VideoAnnotationResults.newBuilder().build()));
     when(future.get()).thenReturn(response);
     when(serviceClient.annotateVideoAsync(any())).thenReturn(future);
-    VideoIntelligence.AnnotateVideoFromBytes annotateVideoFromBytes =
-        VideoIntelligence.annotateFromBytes(
-            null, Collections.singletonList(Feature.LABEL_DETECTION));
+    AnnotateVideoFromBytesFn annotateVideoFromBytesFn =
+        new AnnotateVideoFromBytesFn(null, Collections.singletonList(Feature.LABEL_DETECTION));
 
-    annotateVideoFromBytes.videoIntelligenceServiceClient = serviceClient;
+    annotateVideoFromBytesFn.videoIntelligenceServiceClient = serviceClient;
     List<VideoAnnotationResults> videoAnnotationResults =
-        annotateVideoFromBytes.getVideoAnnotationResults(TEST_URI, null, null);
+        annotateVideoFromBytesFn.getVideoAnnotationResults(TEST_URI, null, null);
     assertEquals(1, videoAnnotationResults.size());
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void shouldThrowErrorWhenBothInputTypesNull()
       throws ExecutionException, InterruptedException {
-    VideoIntelligence.AnnotateVideoFromBytes annotateVideoFromBytes =
-        VideoIntelligence.annotateFromBytes(
-            null, Collections.singletonList(Feature.LABEL_DETECTION));
-    annotateVideoFromBytes.getVideoAnnotationResults(null, null, null);
+    AnnotateVideoFromBytesFn annotateVideoFromBytesFn =
+        new AnnotateVideoFromBytesFn(null, Collections.singletonList(Feature.LABEL_DETECTION));
+    annotateVideoFromBytesFn.getVideoAnnotationResults(null, null, null);
   }
 }
diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java
index b0b74ee..6427225 100644
--- a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java
+++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.extensions.ml;
 
-import static org.apache.beam.sdk.extensions.ml.VideoIntelligence.annotateFromURI;
 import static org.junit.Assert.assertEquals;
 
 import com.google.cloud.videointelligence.v1.Feature;
@@ -29,7 +28,6 @@ import java.util.function.Consumer;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
@@ -49,7 +47,7 @@ public class VideoIntelligenceIT {
     PCollection<List<VideoAnnotationResults>> annotationResults =
         testPipeline
             .apply(Create.of(VIDEO_URI))
-            .apply("Annotate video", ParDo.of(annotateFromURI(featureList, null)));
+            .apply("Annotate video", VideoIntelligence.annotateFromURI(featureList, null));
     PAssert.that(annotationResults).satisfies(new VerifyVideoAnnotationResult());
     testPipeline.run().waitUntilFinish();
   }