You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:32 UTC
[37/50] [abbrv] incubator-beam git commit: Fixes Void handling
Fixes Void handling
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a4e9b09f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a4e9b09f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a4e9b09f
Branch: refs/heads/master
Commit: a4e9b09fb4690b4e110afa6bc5744b3646980115
Parents: 067837f
Author: kl0u <kk...@gmail.com>
Authored: Mon Feb 29 16:26:12 2016 +0100
Committer: Davor Bonaci <da...@users.noreply.github.com>
Committed: Fri Mar 4 10:04:23 2016 -0800
----------------------------------------------------------------------
.../streaming/FlinkGroupByKeyWrapper.java | 8 +-
.../flink/dataflow/FlinkTestPipeline.java | 4 +-
.../dataflow/streaming/GroupByNullKeyTest.java | 121 +++++++++++++++++++
3 files changed, 128 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4e9b09f/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
index b0d9e48..24f6d40 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
@@ -16,9 +16,11 @@
package com.dataartisans.flink.dataflow.translation.wrappers.streaming;
import com.dataartisans.flink.dataflow.translation.types.CoderTypeInformation;
+import com.dataartisans.flink.dataflow.translation.types.VoidCoderTypeSerializer;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.util.*;
+import com.google.cloud.dataflow.sdk.coders.VoidCoder;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.KV;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
@@ -42,13 +44,15 @@ public class FlinkGroupByKeyWrapper {
public static <K, V> KeyedStream<WindowedValue<KV<K, V>>, K> groupStreamByKey(DataStream<WindowedValue<KV<K, V>>> inputDataStream, KvCoder<K, V> inputKvCoder) {
final Coder<K> keyCoder = inputKvCoder.getKeyCoder();
final TypeInformation<K> keyTypeInfo = new CoderTypeInformation<>(keyCoder);
+ final boolean isKeyVoid = keyCoder instanceof VoidCoder;
return inputDataStream.keyBy(
new KeySelectorWithQueryableResultType<K, V>() {
@Override
public K getKey(WindowedValue<KV<K, V>> value) throws Exception {
- return value.getValue().getKey();
+ return isKeyVoid ? (K) VoidCoderTypeSerializer.VoidValue.INSTANCE :
+ value.getValue().getKey();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4e9b09f/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
index 56af3f1..59c3b69 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
@@ -59,9 +59,7 @@ public class FlinkTestPipeline extends Pipeline {
*/
private static FlinkTestPipeline create(boolean streaming) {
FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest(streaming);
- FlinkPipelineOptions pipelineOptions = flinkRunner.getPipelineOptions();
- pipelineOptions.setStreaming(streaming);
- return new FlinkTestPipeline(flinkRunner, pipelineOptions);
+ return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions());
}
private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4e9b09f/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
new file mode 100644
index 0000000..5a412aa
--- /dev/null
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.streaming;
+
+import com.dataartisans.flink.dataflow.FlinkTestPipeline;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Joiner;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+public class GroupByNullKeyTest extends StreamingProgramTestBase implements Serializable {
+
+
+ protected String resultPath;
+
+ static final String[] EXPECTED_RESULT = new String[] {
+ "k: null v: user1 user1 user1 user2 user2 user2 user2 user3"
+ };
+
+ public GroupByNullKeyTest(){
+ }
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+ }
+
+ public static class ExtractUserAndTimestamp extends DoFn<KV<Integer, String>, String> {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ KV<Integer, String> record = c.element();
+ long now = System.currentTimeMillis();
+ int timestamp = record.getKey();
+ String userName = record.getValue();
+ if (userName != null) {
+ // Sets the implicit timestamp field to be used in windowing.
+ c.outputWithTimestamp(userName, new Instant(timestamp + now));
+ }
+ }
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+
+ Pipeline p = FlinkTestPipeline.createForStreaming();
+
+ PCollection<String> output =
+ p.apply(Create.of(Arrays.asList(
+ KV.<Integer, String>of(0, "user1"),
+ KV.<Integer, String>of(1, "user1"),
+ KV.<Integer, String>of(2, "user1"),
+ KV.<Integer, String>of(10, "user2"),
+ KV.<Integer, String>of(1, "user2"),
+ KV.<Integer, String>of(15000, "user2"),
+ KV.<Integer, String>of(12000, "user2"),
+ KV.<Integer, String>of(25000, "user3"))))
+ .apply(ParDo.of(new ExtractUserAndTimestamp()))
+ .apply(Window.<String>into(FixedWindows.of(Duration.standardHours(1)))
+ .triggering(AfterWatermark.pastEndOfWindow())
+ .withAllowedLateness(Duration.ZERO)
+ .discardingFiredPanes())
+
+ .apply(ParDo.of(new DoFn<String, KV<Void, String>>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ String elem = c.element();
+ c.output(KV.<Void, String>of((Void) null, elem));
+ }
+ }))
+ .apply(GroupByKey.<Void, String>create())
+ .apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ KV<Void, Iterable<String>> elem = c.element();
+ StringBuilder str = new StringBuilder();
+ str.append("k: " + elem.getKey() + " v:");
+ for (String v : elem.getValue()) {
+ str.append(" " + v);
+ }
+ c.output(str.toString());
+ }
+ }));
+ output.apply(TextIO.Write.to(resultPath));
+ p.run();
+ }
+}