You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:28 UTC

[03/50] [abbrv] beam git commit: [BEAM-79] Add SideInput support for GearpumpRunner

http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
index 656fc6a..b8a5233 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
@@ -18,14 +18,28 @@
 
 package org.apache.beam.runners.gearpump.translators.utils;
 
+import com.google.common.collect.Lists;
+
 import java.time.Instant;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
+import org.apache.beam.runners.gearpump.translators.TranslationContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import org.apache.gearpump.streaming.dsl.api.functions.FoldFunction;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 import org.apache.gearpump.streaming.dsl.window.impl.Window;
 
 
+
 /**
  * Utility methods for translators.
  */
@@ -52,4 +66,137 @@ public class TranslatorUtils {
       throw new RuntimeException("unknown window " + window.getClass().getName());
     }
   }
+
+  public static <InputT> JavaStream<RawUnionValue> withSideInputStream(
+      TranslationContext context,
+      JavaStream<WindowedValue<InputT>> inputStream,
+      Map<String, PCollectionView<?>> tagsToSideInputs) {
+    JavaStream<RawUnionValue> mainStream =
+        inputStream.map(new ToRawUnionValue<InputT>("0"), "map_to_RawUnionValue");
+
+    for (Map.Entry<String, PCollectionView<?>> tagToSideInput: tagsToSideInputs.entrySet()) {
+      // actually JavaStream<WindowedValue<List<?>>>
+      // check CreatePCollectionViewTranslator
+      JavaStream<WindowedValue<Object>> sideInputStream = context.getInputStream(
+          tagToSideInput.getValue());
+      mainStream = mainStream.merge(sideInputStream.map(new ToRawUnionValue<>(
+          tagToSideInput.getKey()), "map_to_RawUnionValue"), "merge_to_MainStream");
+    }
+    return mainStream;
+  }
+
+  public static Map<String, PCollectionView<?>> getTagsToSideInputs(
+      Collection<PCollectionView<?>> sideInputs) {
+    Map<String, PCollectionView<?>> tagsToSideInputs = new HashMap<>();
+    // tag 0 is reserved for main input
+    int tag = 1;
+    for (PCollectionView<?> sideInput: sideInputs) {
+      tagsToSideInputs.put(tag + "", sideInput);
+      tag++;
+    }
+    return tagsToSideInputs;
+  }
+
+  public static JavaStream<List<RawUnionValue>> toList(JavaStream<RawUnionValue> stream) {
+    return stream.fold(new FoldFunction<RawUnionValue, List<RawUnionValue>>() {
+
+      @Override
+      public List<RawUnionValue> init() {
+        return Lists.newArrayList();
+      }
+
+      @Override
+      public List<RawUnionValue> fold(List<RawUnionValue> accumulator,
+          RawUnionValue rawUnionValue) {
+        accumulator.add(rawUnionValue);
+        return accumulator;
+      }
+    }, "fold_to_iterable");
+  }
+
+  /**
+   * Converts @link{RawUnionValue} to @link{WindowedValue}.
+   */
+  public static class FromRawUnionValue<OutputT> extends
+      MapFunction<RawUnionValue, WindowedValue<OutputT>> {
+
+    private static final long serialVersionUID = -4764968219713478955L;
+
+    @Override
+    public WindowedValue<OutputT> map(RawUnionValue value) {
+      return (WindowedValue<OutputT>) value.getValue();
+    }
+  }
+
+  private static class ToRawUnionValue<T> extends
+      MapFunction<WindowedValue<T>, RawUnionValue> {
+
+    private static final long serialVersionUID = 8648852871014813583L;
+    private final String tag;
+
+    ToRawUnionValue(String tag) {
+      this.tag = tag;
+    }
+
+    @Override
+    public RawUnionValue map(WindowedValue<T> windowedValue) {
+      return new RawUnionValue(tag, windowedValue);
+    }
+  }
+
+
+  /**
+   * This is copied from org.apache.beam.sdk.transforms.join.RawUnionValue.
+   */
+  public static class RawUnionValue {
+    private final String unionTag;
+    private final Object value;
+
+    /**
+     * Constructs a partial union from the given union tag and value.
+     */
+    public RawUnionValue(String unionTag, Object value) {
+      this.unionTag = unionTag;
+      this.value = value;
+    }
+
+    public String getUnionTag() {
+      return unionTag;
+    }
+
+    public Object getValue() {
+      return value;
+    }
+
+    @Override
+    public String toString() {
+      return unionTag + ":" + value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      RawUnionValue that = (RawUnionValue) o;
+
+      if (unionTag != that.unionTag) {
+        return false;
+      }
+      return value != null ? value.equals(that.value) : that.value == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = unionTag.hashCode();
+      result = 31 * result + value.hashCode();
+      return result;
+    }
+  }
+
 }