You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mw...@apache.org on 2020/04/20 11:15:30 UTC
[beam] 01/01: [BEAM-9147] Make VideoIntelligence use PTransform on
user-facing API
This is an automated email from the ASF dual-hosted git repository.
mwalenia pushed a commit to branch BEAM-9147-use-ptransforms
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 31117953cb7ca7728099be0d1a1c7410dbc14062
Author: Michal Walenia <mi...@polidea.com>
AuthorDate: Mon Apr 20 13:14:48 2020 +0200
[BEAM-9147] Make VideoIntelligence use PTransform on user-facing API
---
.../beam/sdk/extensions/ml/AnnotateVideo.java | 73 +++-----------
.../ml/AnnotateVideoBytesWithContextFn.java | 50 ++++++++++
.../{AnnotateVideo.java => AnnotateVideoFn.java} | 10 +-
.../extensions/ml/AnnotateVideoFromBytesFn.java | 55 +++++++++++
.../sdk/extensions/ml/AnnotateVideoFromURIFn.java | 54 ++++++++++
.../ml/AnnotateVideoURIWithContextFn.java | 49 +++++++++
.../beam/sdk/extensions/ml/VideoIntelligence.java | 109 +++++++++------------
.../beam/sdk/extensions/ml/package-info.java | 3 +
.../beam/sdk/extensions/ml/AnnotateVideoTest.java | 16 ++-
.../sdk/extensions/ml/VideoIntelligenceIT.java | 4 +-
10 files changed, 285 insertions(+), 138 deletions(-)
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java
index 56e8638..3cedb62 100644
--- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java
@@ -17,87 +17,44 @@
*/
package org.apache.beam.sdk.extensions.ml;
-import com.google.api.gax.longrunning.OperationFuture;
-import com.google.cloud.videointelligence.v1.AnnotateVideoProgress;
-import com.google.cloud.videointelligence.v1.AnnotateVideoRequest;
-import com.google.cloud.videointelligence.v1.AnnotateVideoResponse;
import com.google.cloud.videointelligence.v1.Feature;
import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
import com.google.cloud.videointelligence.v1.VideoContext;
-import com.google.cloud.videointelligence.v1.VideoIntelligenceServiceClient;
-import com.google.protobuf.ByteString;
-import java.io.IOException;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
/**
- * Base class for Video Intelligence transform.
+ * Base class for VideoIntelligence PTransform.
*
- * @param <T> Class of input data being passed in - either ByteString - video data encoded into.
- * String or String - a GCS URI of the video to be annotated.
+ * @param <T> Type of input PCollection contents.
*/
-public abstract class AnnotateVideo<T> extends DoFn<T, List<VideoAnnotationResults>> {
-
+@Experimental
+public abstract class AnnotateVideo<T>
+ extends PTransform<PCollection<T>, PCollection<List<VideoAnnotationResults>>> {
protected final PCollectionView<Map<T, VideoContext>> contextSideInput;
protected final List<Feature> featureList;
- VideoIntelligenceServiceClient videoIntelligenceServiceClient;
- public AnnotateVideo(
+ protected AnnotateVideo(
PCollectionView<Map<T, VideoContext>> contextSideInput, List<Feature> featureList) {
this.contextSideInput = contextSideInput;
this.featureList = featureList;
}
- public AnnotateVideo(List<Feature> featureList) {
- contextSideInput = null;
+ protected AnnotateVideo(List<Feature> featureList) {
+ this.contextSideInput = null;
this.featureList = featureList;
}
- @StartBundle
- public void startBundle() throws IOException {
- videoIntelligenceServiceClient = VideoIntelligenceServiceClient.create();
- }
-
- @Teardown
- public void teardown() {
- videoIntelligenceServiceClient.close();
- }
-
/**
- * Call the Video Intelligence Cloud AI service and return annotation results.
+ * To be implemented based on input PCollection's content type.
*
- * @param elementURI This or elementContents is required. GCS address of video to be annotated
- * @param elementContents this or elementURI is required. Hex-encoded contents of video to be
- * annotated
- * @param videoContext Optional context for video annotation.
+ * @param input
* @return
*/
- List<VideoAnnotationResults> getVideoAnnotationResults(
- String elementURI, ByteString elementContents, VideoContext videoContext)
- throws InterruptedException, ExecutionException {
- AnnotateVideoRequest.Builder requestBuilder =
- AnnotateVideoRequest.newBuilder().addAllFeatures(featureList);
- if (elementURI != null) {
- requestBuilder.setInputUri(elementURI);
- } else if (elementContents != null) {
- requestBuilder.setInputContent(elementContents);
- } else {
- throw new IllegalArgumentException("Either elementURI or elementContents should be non-null");
- }
- if (videoContext != null) {
- requestBuilder.setVideoContext(videoContext);
- }
- AnnotateVideoRequest annotateVideoRequest = requestBuilder.build();
- OperationFuture<AnnotateVideoResponse, AnnotateVideoProgress> annotateVideoAsync =
- videoIntelligenceServiceClient.annotateVideoAsync(annotateVideoRequest);
- return annotateVideoAsync.get().getAnnotationResultsList();
- }
-
- /** Process element implementation required. */
- @ProcessElement
- public abstract void processElement(ProcessContext context)
- throws ExecutionException, InterruptedException;
+ @Override
+ public abstract PCollection<List<VideoAnnotationResults>> expand(PCollection<T> input);
}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoBytesWithContextFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoBytesWithContextFn.java
new file mode 100644
index 0000000..c8edcd1
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoBytesWithContextFn.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.cloud.videointelligence.v1.Feature;
+import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
+import com.google.cloud.videointelligence.v1.VideoContext;
+import com.google.protobuf.ByteString;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Implementation of AnnotateVideoFn accepting KVs as contents of input PCollection. Keys are the
+ * ByteString encoded video contents, values - VideoContext objects.
+ */
+@Experimental
+class AnnotateVideoBytesWithContextFn extends AnnotateVideoFn<KV<ByteString, VideoContext>> {
+
+ public AnnotateVideoBytesWithContextFn(List<Feature> featureList) {
+ super(featureList);
+ }
+
+ /** ProcessElement implementation. */
+ @Override
+ public void processElement(ProcessContext context)
+ throws ExecutionException, InterruptedException {
+ ByteString element = context.element().getKey();
+ VideoContext videoContext = context.element().getValue();
+ List<VideoAnnotationResults> videoAnnotationResults =
+ getVideoAnnotationResults(null, element, videoContext);
+ context.output(videoAnnotationResults);
+ }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFn.java
similarity index 93%
copy from sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java
copy to sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFn.java
index 56e8638..3cf0c30 100644
--- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFn.java
@@ -30,28 +30,30 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.PCollectionView;
/**
- * Base class for Video Intelligence transform.
+ * Base class for DoFns used in VideoIntelligence transforms.
*
* @param <T> Class of input data being passed in - either ByteString - video data encoded into.
* String or String - a GCS URI of the video to be annotated.
*/
-public abstract class AnnotateVideo<T> extends DoFn<T, List<VideoAnnotationResults>> {
+@Experimental
+abstract class AnnotateVideoFn<T> extends DoFn<T, List<VideoAnnotationResults>> {
protected final PCollectionView<Map<T, VideoContext>> contextSideInput;
protected final List<Feature> featureList;
VideoIntelligenceServiceClient videoIntelligenceServiceClient;
- public AnnotateVideo(
+ public AnnotateVideoFn(
PCollectionView<Map<T, VideoContext>> contextSideInput, List<Feature> featureList) {
this.contextSideInput = contextSideInput;
this.featureList = featureList;
}
- public AnnotateVideo(List<Feature> featureList) {
+ public AnnotateVideoFn(List<Feature> featureList) {
contextSideInput = null;
this.featureList = featureList;
}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromBytesFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromBytesFn.java
new file mode 100644
index 0000000..20aa083
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromBytesFn.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.cloud.videointelligence.v1.Feature;
+import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
+import com.google.cloud.videointelligence.v1.VideoContext;
+import com.google.protobuf.ByteString;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Implementation of AnnotateVideoFn accepting ByteStrings as contents of input PCollection. Videos
+ * decoded from the ByteStrings are annotated.
+ */
+@Experimental
+class AnnotateVideoFromBytesFn extends AnnotateVideoFn<ByteString> {
+
+ public AnnotateVideoFromBytesFn(
+ PCollectionView<Map<ByteString, VideoContext>> contextSideInput, List<Feature> featureList) {
+ super(contextSideInput, featureList);
+ }
+
+ /** Implementation of ProcessElement. */
+ @Override
+ public void processElement(ProcessContext context)
+ throws ExecutionException, InterruptedException {
+ ByteString element = context.element();
+ VideoContext videoContext = null;
+ if (contextSideInput != null) {
+ videoContext = context.sideInput(contextSideInput).get(element);
+ }
+ List<VideoAnnotationResults> videoAnnotationResults =
+ getVideoAnnotationResults(null, element, videoContext);
+ context.output(videoAnnotationResults);
+ }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromURIFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromURIFn.java
new file mode 100644
index 0000000..5dfea0c
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromURIFn.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.cloud.videointelligence.v1.Feature;
+import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
+import com.google.cloud.videointelligence.v1.VideoContext;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Implementation of AnnotateVideoFn accepting Strings as contents of input PCollection. Annotates
+ * videos found on GCS based on URIs from input PCollection.
+ */
+@Experimental
+class AnnotateVideoFromURIFn extends AnnotateVideoFn<String> {
+
+ public AnnotateVideoFromURIFn(
+ PCollectionView<Map<String, VideoContext>> contextSideInput, List<Feature> featureList) {
+ super(contextSideInput, featureList);
+ }
+
+ /** ProcessElement implementation. */
+ @Override
+ public void processElement(ProcessContext context)
+ throws ExecutionException, InterruptedException {
+ String elementURI = context.element();
+ VideoContext videoContext = null;
+ if (contextSideInput != null) {
+ videoContext = context.sideInput(contextSideInput).get(elementURI);
+ }
+ List<VideoAnnotationResults> annotationResultsList =
+ getVideoAnnotationResults(elementURI, null, videoContext);
+ context.output(annotationResultsList);
+ }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoURIWithContextFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoURIWithContextFn.java
new file mode 100644
index 0000000..a165d5a
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoURIWithContextFn.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.cloud.videointelligence.v1.Feature;
+import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
+import com.google.cloud.videointelligence.v1.VideoContext;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Implementation of AnnotateVideoFn accepting KVs as contents of input PCollection. Keys are the
+ * GCS URIs, values - VideoContext objects.
+ */
+@Experimental
+class AnnotateVideoURIWithContextFn extends AnnotateVideoFn<KV<String, VideoContext>> {
+
+ public AnnotateVideoURIWithContextFn(List<Feature> featureList) {
+ super(featureList);
+ }
+
+ /** ProcessElement implementation. */
+ @Override
+ public void processElement(ProcessContext context)
+ throws ExecutionException, InterruptedException {
+ String elementURI = context.element().getKey();
+ VideoContext videoContext = context.element().getValue();
+ List<VideoAnnotationResults> videoAnnotationResults =
+ getVideoAnnotationResults(elementURI, null, videoContext);
+ context.output(videoAnnotationResults);
+ }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java
index 0f447da..5fa25d6 100644
--- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java
@@ -23,8 +23,10 @@ import com.google.cloud.videointelligence.v1.VideoContext;
import com.google.protobuf.ByteString;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
/**
@@ -37,6 +39,7 @@ import org.apache.beam.sdk.values.PCollectionView;
*
* <p>Service account with proper permissions is required to use these transforms.
*/
+@Experimental
public class VideoIntelligence {
/**
@@ -44,11 +47,11 @@ public class VideoIntelligence {
*
* @param featureList List of features to be annotated
* @param contextSideInput Optional side input with map of contexts to URIs
- * @return DoFn performing the necessary operations
+ * @return PTransform performing the necessary operations
*/
- public static AnnotateVideoFromURI annotateFromURI(
+ public static AnnotateVideoFromUri annotateFromURI(
List<Feature> featureList, PCollectionView<Map<String, VideoContext>> contextSideInput) {
- return new AnnotateVideoFromURI(contextSideInput, featureList);
+ return new AnnotateVideoFromUri(contextSideInput, featureList);
}
/**
@@ -56,7 +59,7 @@ public class VideoIntelligence {
*
* @param featureList List of features to be annotated
* @param contextSideInput Optional side input with map of contexts to ByteStrings
- * @return DoFn performing the necessary operations
+ * @return PTransform performing the necessary operations
*/
public static AnnotateVideoFromBytes annotateFromBytes(
PCollectionView<Map<ByteString, VideoContext>> contextSideInput, List<Feature> featureList) {
@@ -67,118 +70,96 @@ public class VideoIntelligence {
* Annotates videos from key-value pairs of GCS URI and VideoContext.
*
* @param featureList List of features to be annotated
- * @return DoFn performing the necessary operations
+ * @return PTransform performing the necessary operations
*/
- public static AnnotateVideoURIWithContext annotateFromUriWithContext(List<Feature> featureList) {
- return new AnnotateVideoURIWithContext(featureList);
+ public static AnnotateVideoFromURIWithContext annotateFromUriWithContext(
+ List<Feature> featureList) {
+ return new AnnotateVideoFromURIWithContext(featureList);
}
/**
* Annotates videos from key-value pairs of ByteStrings and VideoContext.
*
* @param featureList List of features to be annotated
- * @return DoFn performing the necessary operations
+ * @return PTransform performing the necessary operations
*/
- public static AnnotateVideoBytesWithContext annotateFromBytesWithContext(
+ public static AnnotateVideoFromBytesWithContext annotateFromBytesWithContext(
List<Feature> featureList) {
- return new AnnotateVideoBytesWithContext(featureList);
+ return new AnnotateVideoFromBytesWithContext(featureList);
}
/**
- * Implementation of AnnotateVideo accepting Strings as contents of input PCollection. Annotates
- * videos found on GCS based on URIs from input PCollection.
+ * Implementation of {@link AnnotateVideo} taking a PCollection of {@link String} and an optional
+ * side input with a context map.
*/
- public static class AnnotateVideoFromURI extends AnnotateVideo<String> {
+ @Experimental
+ public static class AnnotateVideoFromUri extends AnnotateVideo<String> {
- public AnnotateVideoFromURI(
+ protected AnnotateVideoFromUri(
PCollectionView<Map<String, VideoContext>> contextSideInput, List<Feature> featureList) {
super(contextSideInput, featureList);
}
- /** ProcessElement implementation. */
@Override
- public void processElement(ProcessContext context)
- throws ExecutionException, InterruptedException {
- String elementURI = context.element();
- VideoContext videoContext = null;
- if (contextSideInput != null) {
- videoContext = context.sideInput(contextSideInput).get(elementURI);
- }
- List<VideoAnnotationResults> annotationResultsList =
- getVideoAnnotationResults(elementURI, null, videoContext);
- context.output(annotationResultsList);
+ public PCollection<List<VideoAnnotationResults>> expand(PCollection<String> input) {
+ return input.apply(ParDo.of(new AnnotateVideoFromURIFn(contextSideInput, featureList)));
}
}
/**
- * Implementation of AnnotateVideo accepting ByteStrings as contents of input PCollection. Videos
- * decoded from the ByteStrings are annotated.
+ * Implementation of {@link AnnotateVideo} taking a PCollection of {@link ByteString} and an
+ * optional side input with a context map.
*/
+ @Experimental
public static class AnnotateVideoFromBytes extends AnnotateVideo<ByteString> {
- public AnnotateVideoFromBytes(
+ protected AnnotateVideoFromBytes(
PCollectionView<Map<ByteString, VideoContext>> contextSideInput,
List<Feature> featureList) {
super(contextSideInput, featureList);
}
- /** Implementation of ProcessElement. */
@Override
- public void processElement(ProcessContext context)
- throws ExecutionException, InterruptedException {
- ByteString element = context.element();
- VideoContext videoContext = null;
- if (contextSideInput != null) {
- videoContext = context.sideInput(contextSideInput).get(element);
- }
- List<VideoAnnotationResults> videoAnnotationResults =
- getVideoAnnotationResults(null, element, videoContext);
- context.output(videoAnnotationResults);
+ public PCollection<List<VideoAnnotationResults>> expand(PCollection<ByteString> input) {
+ return input.apply(ParDo.of(new AnnotateVideoFromBytesFn(contextSideInput, featureList)));
}
}
/**
- * Implementation of AnnotateVideo accepting KVs as contents of input PCollection. Keys are the
- * GCS URIs, values - VideoContext objects.
+ * Implementation of {@link AnnotateVideo} taking a PCollection of {@link KV} of {@link String}
+ * and {@link VideoContext}.
*/
- public static class AnnotateVideoURIWithContext extends AnnotateVideo<KV<String, VideoContext>> {
+ @Experimental
+ public static class AnnotateVideoFromURIWithContext
+ extends AnnotateVideo<KV<String, VideoContext>> {
- public AnnotateVideoURIWithContext(List<Feature> featureList) {
+ protected AnnotateVideoFromURIWithContext(List<Feature> featureList) {
super(featureList);
}
- /** ProcessElement implementation. */
@Override
- public void processElement(ProcessContext context)
- throws ExecutionException, InterruptedException {
- String elementURI = context.element().getKey();
- VideoContext videoContext = context.element().getValue();
- List<VideoAnnotationResults> videoAnnotationResults =
- getVideoAnnotationResults(elementURI, null, videoContext);
- context.output(videoAnnotationResults);
+ public PCollection<List<VideoAnnotationResults>> expand(
+ PCollection<KV<String, VideoContext>> input) {
+ return input.apply(ParDo.of(new AnnotateVideoURIWithContextFn(featureList)));
}
}
/**
- * Implementation of AnnotateVideo accepting KVs as contents of input PCollection. Keys are the
- * ByteString encoded video contents, values - VideoContext objects.
+ * Implementation of {@link AnnotateVideo} taking a PCollection of {@link KV} of {@link
+ * ByteString} and {@link VideoContext}.
*/
- public static class AnnotateVideoBytesWithContext
+ @Experimental
+ public static class AnnotateVideoFromBytesWithContext
extends AnnotateVideo<KV<ByteString, VideoContext>> {
- public AnnotateVideoBytesWithContext(List<Feature> featureList) {
+ protected AnnotateVideoFromBytesWithContext(List<Feature> featureList) {
super(featureList);
}
- /** ProcessElement implementation. */
@Override
- public void processElement(ProcessContext context)
- throws ExecutionException, InterruptedException {
- ByteString element = context.element().getKey();
- VideoContext videoContext = context.element().getValue();
- List<VideoAnnotationResults> videoAnnotationResults =
- getVideoAnnotationResults(null, element, videoContext);
- context.output(videoAnnotationResults);
+ public PCollection<List<VideoAnnotationResults>> expand(
+ PCollection<KV<ByteString, VideoContext>> input) {
+ return input.apply(ParDo.of(new AnnotateVideoBytesWithContextFn(featureList)));
}
}
}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/package-info.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/package-info.java
index ad5216d..241d902 100644
--- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/package-info.java
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/package-info.java
@@ -16,4 +16,7 @@
* limitations under the License.
*/
/** Provides DoFns for integration with Google Cloud AI Video Intelligence service. */
+@Experimental
package org.apache.beam.sdk.extensions.ml;
+
+import org.apache.beam.sdk.annotations.Experimental;
diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java
index 57400e4..56473aa 100644
--- a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java
+++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java
@@ -52,22 +52,20 @@ public class AnnotateVideoTest {
.thenReturn(Collections.singletonList(VideoAnnotationResults.newBuilder().build()));
when(future.get()).thenReturn(response);
when(serviceClient.annotateVideoAsync(any())).thenReturn(future);
- VideoIntelligence.AnnotateVideoFromBytes annotateVideoFromBytes =
- VideoIntelligence.annotateFromBytes(
- null, Collections.singletonList(Feature.LABEL_DETECTION));
+ AnnotateVideoFromBytesFn annotateVideoFromBytesFn =
+ new AnnotateVideoFromBytesFn(null, Collections.singletonList(Feature.LABEL_DETECTION));
- annotateVideoFromBytes.videoIntelligenceServiceClient = serviceClient;
+ annotateVideoFromBytesFn.videoIntelligenceServiceClient = serviceClient;
List<VideoAnnotationResults> videoAnnotationResults =
- annotateVideoFromBytes.getVideoAnnotationResults(TEST_URI, null, null);
+ annotateVideoFromBytesFn.getVideoAnnotationResults(TEST_URI, null, null);
assertEquals(1, videoAnnotationResults.size());
}
@Test(expected = IllegalArgumentException.class)
public void shouldThrowErrorWhenBothInputTypesNull()
throws ExecutionException, InterruptedException {
- VideoIntelligence.AnnotateVideoFromBytes annotateVideoFromBytes =
- VideoIntelligence.annotateFromBytes(
- null, Collections.singletonList(Feature.LABEL_DETECTION));
- annotateVideoFromBytes.getVideoAnnotationResults(null, null, null);
+ AnnotateVideoFromBytesFn annotateVideoFromBytesFn =
+ new AnnotateVideoFromBytesFn(null, Collections.singletonList(Feature.LABEL_DETECTION));
+ annotateVideoFromBytesFn.getVideoAnnotationResults(null, null, null);
}
}
diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java
index b0b74ee..6427225 100644
--- a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java
+++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.extensions.ml;
-import static org.apache.beam.sdk.extensions.ml.VideoIntelligence.annotateFromURI;
import static org.junit.Assert.assertEquals;
import com.google.cloud.videointelligence.v1.Feature;
@@ -29,7 +28,6 @@ import java.util.function.Consumer;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Rule;
@@ -49,7 +47,7 @@ public class VideoIntelligenceIT {
PCollection<List<VideoAnnotationResults>> annotationResults =
testPipeline
.apply(Create.of(VIDEO_URI))
- .apply("Annotate video", ParDo.of(annotateFromURI(featureList, null)));
+ .apply("Annotate video", VideoIntelligence.annotateFromURI(featureList, null));
PAssert.that(annotationResults).satisfies(new VerifyVideoAnnotationResult());
testPipeline.run().waitUntilFinish();
}