You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:56 UTC

[31/50] [abbrv] beam git commit: upgrade to gearpump 0.8.4-SNAPSHOT

upgrade to gearpump 0.8.4-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f61822d4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f61822d4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f61822d4

Branch: refs/heads/master
Commit: f61822d41653def8332cb3cc55140685c3dd75a2
Parents: fed98c8
Author: manuzhang <ow...@gmail.com>
Authored: Wed Jun 7 14:06:43 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Jun 17 11:00:29 2017 +0800

----------------------------------------------------------------------
 runners/gearpump/pom.xml                               |  2 +-
 .../runners/gearpump/GearpumpPipelineTranslator.java   |  4 +++-
 .../CreateGearpumpPCollectionViewTranslator.java       |  2 +-
 .../translators/FlattenPCollectionsTranslator.java     |  2 +-
 .../gearpump/translators/GroupByKeyTranslator.java     | 13 +++++++++----
 .../gearpump/translators/functions/DoFnFunction.java   |  1 +
 .../gearpump/translators/io/GearpumpSource.java        |  5 +++--
 .../gearpump/translators/utils/TranslatorUtils.java    |  2 +-
 .../translators/FlattenPCollectionsTranslatorTest.java |  6 +++---
 .../gearpump/translators/io/GearpumpSourceTest.java    |  3 ++-
 10 files changed, 25 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f61822d4/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index 7e39a48..3c98d5e 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -43,7 +43,7 @@
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-    <gearpump.version>0.8.3</gearpump.version>
+    <gearpump.version>0.8.4-SNAPSHOT</gearpump.version>
   </properties>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/beam/blob/f61822d4/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
index daf65d9..58b44a3 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
@@ -54,6 +54,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.values.KV;
@@ -143,7 +144,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
 
   @Override
   public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-    LOG.debug("visiting transform {}", node.getTransform());
+    LOG.info("visiting transform {}", node.getTransform());
     PTransform transform = node.getTransform();
     TransformTranslator translator = getTransformTranslator(transform.getClass());
     if (null == translator) {
@@ -346,6 +347,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
     }
   }
 
+
   private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
       extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f61822d4/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
index c7f24a8..d7588c2 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
@@ -41,6 +41,6 @@ public class CreateGearpumpPCollectionViewTranslator<ElemT, ViewT> implements
     JavaStream<WindowedValue<List<ElemT>>> inputStream =
         context.getInputStream(context.getInput());
     PCollectionView<ViewT> view = (PCollectionView<ViewT>) context.getOutput();
-    context.setOutputStream(view, inputStream);
+    context.setOutputStream(view.getPCollection(), inputStream);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f61822d4/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
index 5ca05d8..8cc0058 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
@@ -57,7 +57,7 @@ public class FlattenPCollectionsTranslator<T> implements
           inputStream = inputStream.map(new DummyFunction<T>(), "dummy");
         }
 
-        merged = merged.merge(inputStream, transform.getName());
+        merged = merged.merge(inputStream, 1, transform.getName());
       }
       unique.add(collection);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/f61822d4/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 7d944a4..8409beb 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -72,7 +72,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     JavaStream<WindowedValue<KV<K, List<V>>>> outputStream = inputStream
         .window(Windows.apply(
             new GearpumpWindowFn(windowFn.isNonMerging()),
-            EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
+            EventTimeTrigger$.MODULE$, Discarding$.MODULE$, windowFn.toString()))
         .groupBy(new GroupByFn<K, V>(inputKeyCoder), parallelism, "group_by_Key_and_Window")
         .map(new KeyedByTimestamp<K, V>(windowFn, timestampCombiner), "keyed_by_timestamp")
         .fold(new Merge<>(windowFn, timestampCombiner), "merge")
@@ -85,7 +85,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
    * A transform used internally to translate Beam's Window to Gearpump's Window.
    */
   protected static class GearpumpWindowFn<T, W extends BoundedWindow>
-      implements WindowFunction<WindowedValue<T>>, Serializable {
+      implements WindowFunction, Serializable {
 
     private final boolean isNonMerging;
 
@@ -94,9 +94,14 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     }
 
     @Override
-    public Window[] apply(Context<WindowedValue<T>> context) {
+    public <T> Window[]  apply(Context<T> context) {
       try {
-        return toGearpumpWindows(context.element().getWindows().toArray(new BoundedWindow[0]));
+        Object element = context.element();
+        if (element instanceof TranslatorUtils.RawUnionValue) {
+          element = ((TranslatorUtils.RawUnionValue) element).getValue();
+        }
+        return toGearpumpWindows(((WindowedValue<T>) element).getWindows()
+            .toArray(new BoundedWindow[0]));
       } catch (Exception e) {
         throw new RuntimeException(e);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/f61822d4/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index 6e4fbeb..e2777df 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -145,6 +145,7 @@ public class DoFnFunction<InputT, OutputT> extends
             Object emptyValue = WindowedValue.of(
                 Lists.newArrayList(), value.getTimestamp(), sideInputWindow, value.getPane());
             sideInputReader.addSideInputValue(sideInput, (WindowedValue<Iterable<?>>) emptyValue);
+            System.out.println(sideInput + " in " + sideInputWindow.toString() + " not ready");
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/f61822d4/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index 60f319d..6637a9b 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 
+import org.apache.gearpump.DefaultMessage;
 import org.apache.gearpump.Message;
 import org.apache.gearpump.streaming.source.DataSource;
 import org.apache.gearpump.streaming.source.Watermark;
@@ -77,9 +78,9 @@ public abstract class GearpumpSource<T> implements DataSource {
       if (available) {
         T data = reader.getCurrent();
         org.joda.time.Instant timestamp = reader.getCurrentTimestamp();
-        message = Message.apply(
+        message = new DefaultMessage(
             WindowedValue.timestampedValueInGlobalWindow(data, timestamp),
-            timestamp.getMillis());
+            TranslatorUtils.jodaTimeToJava8Time(timestamp));
       }
       available = reader.advance();
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/beam/blob/f61822d4/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
index 282f261..83fc6e6 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
@@ -76,7 +76,7 @@ public class TranslatorUtils {
       JavaStream<WindowedValue<List<?>>> sideInputStream = context.getInputStream(
           tagToSideInput.getValue());
       mainStream = mainStream.merge(sideInputStream.map(new ToRawUnionValue<>(
-          tagToSideInput.getKey()), "map_to_RawUnionValue"), "merge_to_MainStream");
+          tagToSideInput.getKey()), "map_to_RawUnionValue"), 1, "merge_to_MainStream");
     }
     return mainStream;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f61822d4/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
index ac12fa4..1262177 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
@@ -113,8 +113,8 @@ public class FlattenPCollectionsTranslatorTest {
     when(translationContext.getInputs()).thenReturn(inputs);
     when(translationContext.getInputStream(mockCollection1)).thenReturn(javaStream1);
     when(translationContext.getInputStream(mockCollection2)).thenReturn(javaStream2);
-    when(javaStream1.merge(javaStream2, transformName)).thenReturn(mergedStream);
-    when(javaStream2.merge(javaStream1, transformName)).thenReturn(mergedStream);
+    when(javaStream1.merge(javaStream2, 1, transformName)).thenReturn(mergedStream);
+    when(javaStream2.merge(javaStream1, 1, transformName)).thenReturn(mergedStream);
 
     when(translationContext.getOutput()).thenReturn(output);
 
@@ -144,6 +144,6 @@ public class FlattenPCollectionsTranslatorTest {
 
     translator.translate(transform, translationContext);
     verify(javaStream1).map(any(MapFunction.class), eq("dummy"));
-    verify(javaStream1).merge(any(JavaStream.class), eq(transformName));
+    verify(javaStream1).merge(any(JavaStream.class), eq(1), eq(transformName));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f61822d4/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
index b244484..4490737 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.gearpump.DefaultMessage;
 import org.apache.gearpump.Message;
 import org.apache.gearpump.streaming.source.Watermark;
 import org.junit.Assert;
@@ -77,7 +78,7 @@ public class GearpumpSourceTest {
       Assert.assertEquals(expectedWaterMark, sourceForTest.getWatermark());
 
       Message expectedMsg =
-          Message.apply(
+          new DefaultMessage(
               WindowedValue.timestampedValueInGlobalWindow(value, value.getTimestamp()),
               value.getTimestamp().getMillis());
       Message message = sourceForTest.read();