You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:28 UTC
[03/50] [abbrv] beam git commit: [BEAM-79] Add SideInput support for
GearpumpRunner
http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
index 656fc6a..b8a5233 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
@@ -18,14 +18,28 @@
package org.apache.beam.runners.gearpump.translators.utils;
+import com.google.common.collect.Lists;
+
import java.time.Instant;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.gearpump.translators.TranslationContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import org.apache.gearpump.streaming.dsl.api.functions.FoldFunction;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
import org.apache.gearpump.streaming.dsl.window.impl.Window;
+
/**
* Utility methods for translators.
*/
@@ -52,4 +66,137 @@ public class TranslatorUtils {
throw new RuntimeException("unknown window " + window.getClass().getName());
}
}
+
+ public static <InputT> JavaStream<RawUnionValue> withSideInputStream(
+ TranslationContext context,
+ JavaStream<WindowedValue<InputT>> inputStream,
+ Map<String, PCollectionView<?>> tagsToSideInputs) {
+ JavaStream<RawUnionValue> mainStream =
+ inputStream.map(new ToRawUnionValue<InputT>("0"), "map_to_RawUnionValue");
+
+ for (Map.Entry<String, PCollectionView<?>> tagToSideInput: tagsToSideInputs.entrySet()) {
+ // actually JavaStream<WindowedValue<List<?>>>
+ // check CreatePCollectionViewTranslator
+ JavaStream<WindowedValue<Object>> sideInputStream = context.getInputStream(
+ tagToSideInput.getValue());
+ mainStream = mainStream.merge(sideInputStream.map(new ToRawUnionValue<>(
+ tagToSideInput.getKey()), "map_to_RawUnionValue"), "merge_to_MainStream");
+ }
+ return mainStream;
+ }
+
+ public static Map<String, PCollectionView<?>> getTagsToSideInputs(
+ Collection<PCollectionView<?>> sideInputs) {
+ Map<String, PCollectionView<?>> tagsToSideInputs = new HashMap<>();
+ // tag 0 is reserved for main input
+ int tag = 1;
+ for (PCollectionView<?> sideInput: sideInputs) {
+ tagsToSideInputs.put(tag + "", sideInput);
+ tag++;
+ }
+ return tagsToSideInputs;
+ }
+
+ public static JavaStream<List<RawUnionValue>> toList(JavaStream<RawUnionValue> stream) {
+ return stream.fold(new FoldFunction<RawUnionValue, List<RawUnionValue>>() {
+
+ @Override
+ public List<RawUnionValue> init() {
+ return Lists.newArrayList();
+ }
+
+ @Override
+ public List<RawUnionValue> fold(List<RawUnionValue> accumulator,
+ RawUnionValue rawUnionValue) {
+ accumulator.add(rawUnionValue);
+ return accumulator;
+ }
+ }, "fold_to_iterable");
+ }
+
+ /**
+ * Converts @link{RawUnionValue} to @link{WindowedValue}.
+ */
+ public static class FromRawUnionValue<OutputT> extends
+ MapFunction<RawUnionValue, WindowedValue<OutputT>> {
+
+ private static final long serialVersionUID = -4764968219713478955L;
+
+ @Override
+ public WindowedValue<OutputT> map(RawUnionValue value) {
+ return (WindowedValue<OutputT>) value.getValue();
+ }
+ }
+
+ private static class ToRawUnionValue<T> extends
+ MapFunction<WindowedValue<T>, RawUnionValue> {
+
+ private static final long serialVersionUID = 8648852871014813583L;
+ private final String tag;
+
+ ToRawUnionValue(String tag) {
+ this.tag = tag;
+ }
+
+ @Override
+ public RawUnionValue map(WindowedValue<T> windowedValue) {
+ return new RawUnionValue(tag, windowedValue);
+ }
+ }
+
+
+ /**
+ * This is copied from org.apache.beam.sdk.transforms.join.RawUnionValue.
+ */
+ public static class RawUnionValue {
+ private final String unionTag;
+ private final Object value;
+
+ /**
+ * Constructs a partial union from the given union tag and value.
+ */
+ public RawUnionValue(String unionTag, Object value) {
+ this.unionTag = unionTag;
+ this.value = value;
+ }
+
+ public String getUnionTag() {
+ return unionTag;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return unionTag + ":" + value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ RawUnionValue that = (RawUnionValue) o;
+
+ if (unionTag != that.unionTag) {
+ return false;
+ }
+ return value != null ? value.equals(that.value) : that.value == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = unionTag.hashCode();
+ result = 31 * result + value.hashCode();
+ return result;
+ }
+ }
+
}