You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:56 UTC
[31/50] [abbrv] beam git commit: upgrade to gearpump 0.8.4-SNAPSHOT
upgrade to gearpump 0.8.4-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f61822d4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f61822d4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f61822d4
Branch: refs/heads/master
Commit: f61822d41653def8332cb3cc55140685c3dd75a2
Parents: fed98c8
Author: manuzhang <ow...@gmail.com>
Authored: Wed Jun 7 14:06:43 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Jun 17 11:00:29 2017 +0800
----------------------------------------------------------------------
runners/gearpump/pom.xml | 2 +-
.../runners/gearpump/GearpumpPipelineTranslator.java | 4 +++-
.../CreateGearpumpPCollectionViewTranslator.java | 2 +-
.../translators/FlattenPCollectionsTranslator.java | 2 +-
.../gearpump/translators/GroupByKeyTranslator.java | 13 +++++++++----
.../gearpump/translators/functions/DoFnFunction.java | 1 +
.../gearpump/translators/io/GearpumpSource.java | 5 +++--
.../gearpump/translators/utils/TranslatorUtils.java | 2 +-
.../translators/FlattenPCollectionsTranslatorTest.java | 6 +++---
.../gearpump/translators/io/GearpumpSourceTest.java | 3 ++-
10 files changed, 25 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f61822d4/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index 7e39a48..3c98d5e 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -43,7 +43,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <gearpump.version>0.8.3</gearpump.version>
+ <gearpump.version>0.8.4-SNAPSHOT</gearpump.version>
</properties>
<profiles>
http://git-wip-us.apache.org/repos/asf/beam/blob/f61822d4/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
index daf65d9..58b44a3 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
@@ -54,6 +54,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.values.KV;
@@ -143,7 +144,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
@Override
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
- LOG.debug("visiting transform {}", node.getTransform());
+ LOG.info("visiting transform {}", node.getTransform());
PTransform transform = node.getTransform();
TransformTranslator translator = getTransformTranslator(transform.getClass());
if (null == translator) {
@@ -346,6 +347,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
}
}
+
private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
http://git-wip-us.apache.org/repos/asf/beam/blob/f61822d4/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
index c7f24a8..d7588c2 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
@@ -41,6 +41,6 @@ public class CreateGearpumpPCollectionViewTranslator<ElemT, ViewT> implements
JavaStream<WindowedValue<List<ElemT>>> inputStream =
context.getInputStream(context.getInput());
PCollectionView<ViewT> view = (PCollectionView<ViewT>) context.getOutput();
- context.setOutputStream(view, inputStream);
+ context.setOutputStream(view.getPCollection(), inputStream);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f61822d4/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
index 5ca05d8..8cc0058 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
@@ -57,7 +57,7 @@ public class FlattenPCollectionsTranslator<T> implements
inputStream = inputStream.map(new DummyFunction<T>(), "dummy");
}
- merged = merged.merge(inputStream, transform.getName());
+ merged = merged.merge(inputStream, 1, transform.getName());
}
unique.add(collection);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f61822d4/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 7d944a4..8409beb 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -72,7 +72,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
JavaStream<WindowedValue<KV<K, List<V>>>> outputStream = inputStream
.window(Windows.apply(
new GearpumpWindowFn(windowFn.isNonMerging()),
- EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
+ EventTimeTrigger$.MODULE$, Discarding$.MODULE$, windowFn.toString()))
.groupBy(new GroupByFn<K, V>(inputKeyCoder), parallelism, "group_by_Key_and_Window")
.map(new KeyedByTimestamp<K, V>(windowFn, timestampCombiner), "keyed_by_timestamp")
.fold(new Merge<>(windowFn, timestampCombiner), "merge")
@@ -85,7 +85,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
* A transform used internally to translate Beam's Window to Gearpump's Window.
*/
protected static class GearpumpWindowFn<T, W extends BoundedWindow>
- implements WindowFunction<WindowedValue<T>>, Serializable {
+ implements WindowFunction, Serializable {
private final boolean isNonMerging;
@@ -94,9 +94,14 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
}
@Override
- public Window[] apply(Context<WindowedValue<T>> context) {
+ public <T> Window[] apply(Context<T> context) {
try {
- return toGearpumpWindows(context.element().getWindows().toArray(new BoundedWindow[0]));
+ Object element = context.element();
+ if (element instanceof TranslatorUtils.RawUnionValue) {
+ element = ((TranslatorUtils.RawUnionValue) element).getValue();
+ }
+ return toGearpumpWindows(((WindowedValue<T>) element).getWindows()
+ .toArray(new BoundedWindow[0]));
} catch (Exception e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f61822d4/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index 6e4fbeb..e2777df 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -145,6 +145,7 @@ public class DoFnFunction<InputT, OutputT> extends
Object emptyValue = WindowedValue.of(
Lists.newArrayList(), value.getTimestamp(), sideInputWindow, value.getPane());
sideInputReader.addSideInputValue(sideInput, (WindowedValue<Iterable<?>>) emptyValue);
+ System.out.println(sideInput + " in " + sideInputWindow.toString() + " not ready");
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f61822d4/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index 60f319d..6637a9b 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.gearpump.DefaultMessage;
import org.apache.gearpump.Message;
import org.apache.gearpump.streaming.source.DataSource;
import org.apache.gearpump.streaming.source.Watermark;
@@ -77,9 +78,9 @@ public abstract class GearpumpSource<T> implements DataSource {
if (available) {
T data = reader.getCurrent();
org.joda.time.Instant timestamp = reader.getCurrentTimestamp();
- message = Message.apply(
+ message = new DefaultMessage(
WindowedValue.timestampedValueInGlobalWindow(data, timestamp),
- timestamp.getMillis());
+ TranslatorUtils.jodaTimeToJava8Time(timestamp));
}
available = reader.advance();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/beam/blob/f61822d4/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
index 282f261..83fc6e6 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
@@ -76,7 +76,7 @@ public class TranslatorUtils {
JavaStream<WindowedValue<List<?>>> sideInputStream = context.getInputStream(
tagToSideInput.getValue());
mainStream = mainStream.merge(sideInputStream.map(new ToRawUnionValue<>(
- tagToSideInput.getKey()), "map_to_RawUnionValue"), "merge_to_MainStream");
+ tagToSideInput.getKey()), "map_to_RawUnionValue"), 1, "merge_to_MainStream");
}
return mainStream;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f61822d4/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
index ac12fa4..1262177 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
@@ -113,8 +113,8 @@ public class FlattenPCollectionsTranslatorTest {
when(translationContext.getInputs()).thenReturn(inputs);
when(translationContext.getInputStream(mockCollection1)).thenReturn(javaStream1);
when(translationContext.getInputStream(mockCollection2)).thenReturn(javaStream2);
- when(javaStream1.merge(javaStream2, transformName)).thenReturn(mergedStream);
- when(javaStream2.merge(javaStream1, transformName)).thenReturn(mergedStream);
+ when(javaStream1.merge(javaStream2, 1, transformName)).thenReturn(mergedStream);
+ when(javaStream2.merge(javaStream1, 1, transformName)).thenReturn(mergedStream);
when(translationContext.getOutput()).thenReturn(output);
@@ -144,6 +144,6 @@ public class FlattenPCollectionsTranslatorTest {
translator.translate(transform, translationContext);
verify(javaStream1).map(any(MapFunction.class), eq("dummy"));
- verify(javaStream1).merge(any(JavaStream.class), eq(transformName));
+ verify(javaStream1).merge(any(JavaStream.class), eq(1), eq(transformName));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f61822d4/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
index b244484..4490737 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.gearpump.DefaultMessage;
import org.apache.gearpump.Message;
import org.apache.gearpump.streaming.source.Watermark;
import org.junit.Assert;
@@ -77,7 +78,7 @@ public class GearpumpSourceTest {
Assert.assertEquals(expectedWaterMark, sourceForTest.getWatermark());
Message expectedMsg =
- Message.apply(
+ new DefaultMessage(
WindowedValue.timestampedValueInGlobalWindow(value, value.getTimestamp()),
value.getTimestamp().getMillis());
Message message = sourceForTest.read();