You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:32 UTC

[37/50] [abbrv] incubator-beam git commit: Fixes Void handling

Fixes Void handling


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a4e9b09f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a4e9b09f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a4e9b09f

Branch: refs/heads/master
Commit: a4e9b09fb4690b4e110afa6bc5744b3646980115
Parents: 067837f
Author: kl0u <kk...@gmail.com>
Authored: Mon Feb 29 16:26:12 2016 +0100
Committer: Davor Bonaci <da...@users.noreply.github.com>
Committed: Fri Mar 4 10:04:23 2016 -0800

----------------------------------------------------------------------
 .../streaming/FlinkGroupByKeyWrapper.java       |   8 +-
 .../flink/dataflow/FlinkTestPipeline.java       |   4 +-
 .../dataflow/streaming/GroupByNullKeyTest.java  | 121 +++++++++++++++++++
 3 files changed, 128 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4e9b09f/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
index b0d9e48..24f6d40 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
@@ -16,9 +16,11 @@
 package com.dataartisans.flink.dataflow.translation.wrappers.streaming;
 
 import com.dataartisans.flink.dataflow.translation.types.CoderTypeInformation;
+import com.dataartisans.flink.dataflow.translation.types.VoidCoderTypeSerializer;
 import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.util.*;
+import com.google.cloud.dataflow.sdk.coders.VoidCoder;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.KV;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -42,13 +44,15 @@ public class FlinkGroupByKeyWrapper {
 	public static <K, V> KeyedStream<WindowedValue<KV<K, V>>, K> groupStreamByKey(DataStream<WindowedValue<KV<K, V>>> inputDataStream, KvCoder<K, V> inputKvCoder) {
 		final Coder<K> keyCoder = inputKvCoder.getKeyCoder();
 		final TypeInformation<K> keyTypeInfo = new CoderTypeInformation<>(keyCoder);
+		final boolean isKeyVoid = keyCoder instanceof VoidCoder;
 
 		return inputDataStream.keyBy(
 				new KeySelectorWithQueryableResultType<K, V>() {
 
 					@Override
 					public K getKey(WindowedValue<KV<K, V>> value) throws Exception {
-						return value.getValue().getKey();
+						return isKeyVoid ? (K) VoidCoderTypeSerializer.VoidValue.INSTANCE :
+								value.getValue().getKey();
 					}
 
 					@Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4e9b09f/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
index 56af3f1..59c3b69 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
@@ -59,9 +59,7 @@ public class FlinkTestPipeline extends Pipeline {
 	 */
 	private static FlinkTestPipeline create(boolean streaming) {
 		FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest(streaming);
-		FlinkPipelineOptions pipelineOptions = flinkRunner.getPipelineOptions();
-		pipelineOptions.setStreaming(streaming);
-		return new FlinkTestPipeline(flinkRunner, pipelineOptions);
+		return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions());
 	}
 
 	private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4e9b09f/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
new file mode 100644
index 0000000..5a412aa
--- /dev/null
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.streaming;
+
+import com.dataartisans.flink.dataflow.FlinkTestPipeline;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Joiner;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+public class GroupByNullKeyTest extends StreamingProgramTestBase implements Serializable {
+
+
+	protected String resultPath;
+
+	static final String[] EXPECTED_RESULT = new String[] {
+			"k: null v: user1 user1 user1 user2 user2 user2 user2 user3"
+	};
+
+	public GroupByNullKeyTest(){
+	}
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+	}
+
+	public static class ExtractUserAndTimestamp extends DoFn<KV<Integer, String>, String> {
+		private static final long serialVersionUID = 0;
+
+		@Override
+		public void processElement(ProcessContext c) {
+			KV<Integer, String> record = c.element();
+			long now = System.currentTimeMillis();
+			int timestamp = record.getKey();
+			String userName = record.getValue();
+			if (userName != null) {
+				// Sets the implicit timestamp field to be used in windowing.
+				c.outputWithTimestamp(userName, new Instant(timestamp + now));
+			}
+		}
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+
+		Pipeline p = FlinkTestPipeline.createForStreaming();
+
+		PCollection<String> output =
+			p.apply(Create.of(Arrays.asList(
+					KV.<Integer, String>of(0, "user1"),
+					KV.<Integer, String>of(1, "user1"),
+					KV.<Integer, String>of(2, "user1"),
+					KV.<Integer, String>of(10, "user2"),
+					KV.<Integer, String>of(1, "user2"),
+					KV.<Integer, String>of(15000, "user2"),
+					KV.<Integer, String>of(12000, "user2"),
+					KV.<Integer, String>of(25000, "user3"))))
+					.apply(ParDo.of(new ExtractUserAndTimestamp()))
+					.apply(Window.<String>into(FixedWindows.of(Duration.standardHours(1)))
+							.triggering(AfterWatermark.pastEndOfWindow())
+							.withAllowedLateness(Duration.ZERO)
+							.discardingFiredPanes())
+
+					.apply(ParDo.of(new DoFn<String, KV<Void, String>>() {
+						@Override
+						public void processElement(ProcessContext c) throws Exception {
+							String elem = c.element();
+							c.output(KV.<Void, String>of((Void) null, elem));
+						}
+					}))
+					.apply(GroupByKey.<Void, String>create())
+					.apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() {
+						@Override
+						public void processElement(ProcessContext c) throws Exception {
+							KV<Void, Iterable<String>> elem = c.element();
+							StringBuilder str = new StringBuilder();
+							str.append("k: " + elem.getKey() + " v:");
+							for (String v : elem.getValue()) {
+								str.append(" " + v);
+							}
+							c.output(str.toString());
+						}
+					}));
+		output.apply(TextIO.Write.to(resultPath));
+		p.run();
+	}
+}