You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/05/20 07:15:20 UTC
[02/14] incubator-beam git commit: [BEAM-270] Support
Timestamps/Windows in Flink Batch
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java
deleted file mode 100644
index 547f3c3..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.examples.complete.TfIdf;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringDelegateCoder;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Keys;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.base.Joiner;
-
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.net.URI;
-
-
-public class TfIdfITCase extends JavaProgramTestBase {
-
- protected String resultPath;
-
- public TfIdfITCase(){
- }
-
- static final String[] EXPECTED_RESULT = new String[] {
- "a", "m", "n", "b", "c", "d"};
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
-
- Pipeline pipeline = FlinkTestPipeline.createForBatch();
-
- pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
-
- PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline
- .apply(Create.of(
- KV.of(new URI("x"), "a b c d"),
- KV.of(new URI("y"), "a b c"),
- KV.of(new URI("z"), "a m n")))
- .apply(new TfIdf.ComputeTfIdf());
-
- PCollection<String> words = wordToUriAndTfIdf
- .apply(Keys.<String>create())
- .apply(RemoveDuplicates.<String>create());
-
- words.apply(TextIO.Write.to(resultPath));
-
- pipeline.run();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java
deleted file mode 100644
index 3254e78..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.examples.WordCount;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.base.Joiner;
-
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.util.Arrays;
-import java.util.List;
-
-
-public class WordCountITCase extends JavaProgramTestBase {
-
- protected String resultPath;
-
- public WordCountITCase(){
- }
-
- static final String[] WORDS_ARRAY = new String[] {
- "hi there", "hi", "hi sue bob",
- "hi sue", "", "bob hi"};
-
- static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
-
- static final String[] COUNTS_ARRAY = new String[] {
- "hi: 5", "there: 1", "sue: 2", "bob: 2"};
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(COUNTS_ARRAY), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
-
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
-
- input
- .apply(new WordCount.CountWords())
- .apply(MapElements.via(new WordCount.FormatAsTextFn()))
- .apply(TextIO.Write.to(resultPath));
-
- p.run();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java
deleted file mode 100644
index 6570e7d..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.google.common.base.Joiner;
-
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-
-public class WordCountJoin2ITCase extends JavaProgramTestBase {
-
- static final String[] WORDS_1 = new String[] {
- "hi there", "hi", "hi sue bob",
- "hi sue", "", "bob hi"};
-
- static final String[] WORDS_2 = new String[] {
- "hi tim", "beauty", "hooray sue bob",
- "hi there", "", "please say hi"};
-
- static final String[] RESULTS = new String[] {
- "beauty -> Tag1: Tag2: 1",
- "bob -> Tag1: 2 Tag2: 1",
- "hi -> Tag1: 5 Tag2: 3",
- "hooray -> Tag1: Tag2: 1",
- "please -> Tag1: Tag2: 1",
- "say -> Tag1: Tag2: 1",
- "sue -> Tag1: 2 Tag2: 1",
- "there -> Tag1: 1 Tag2: 1",
- "tim -> Tag1: Tag2: 1"
- };
-
- static final TupleTag<Long> tag1 = new TupleTag<>("Tag1");
- static final TupleTag<Long> tag2 = new TupleTag<>("Tag2");
-
- protected String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- /* Create two PCollections and join them */
- PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1))
- .apply(ParDo.of(new ExtractWordsFn()))
- .apply(Count.<String>perElement());
-
- PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2))
- .apply(ParDo.of(new ExtractWordsFn()))
- .apply(Count.<String>perElement());
-
- /* CoGroup the two collections */
- PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple
- .of(tag1, occurences1)
- .and(tag2, occurences2)
- .apply(CoGroupByKey.<String>create());
-
- /* Format output */
- mergedOccurences.apply(ParDo.of(new FormatCountsFn()))
- .apply(TextIO.Write.named("test").to(resultPath));
-
- p.run();
- }
-
-
- static class ExtractWordsFn extends DoFn<String, String> {
-
- @Override
- public void startBundle(Context c) {
- }
-
- @Override
- public void processElement(ProcessContext c) {
- // Split the line into words.
- String[] words = c.element().split("[^a-zA-Z']+");
-
- // Output each word encountered into the output PCollection.
- for (String word : words) {
- if (!word.isEmpty()) {
- c.output(word);
- }
- }
- }
- }
-
- static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> {
- @Override
- public void processElement(ProcessContext c) {
- CoGbkResult value = c.element().getValue();
- String key = c.element().getKey();
- String countTag1 = tag1.getId() + ": ";
- String countTag2 = tag2.getId() + ": ";
- for (Long count : value.getAll(tag1)) {
- countTag1 += count + " ";
- }
- for (Long count : value.getAll(tag2)) {
- countTag2 += count;
- }
- c.output(key + " -> " + countTag1 + countTag2);
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java
deleted file mode 100644
index 60dc74a..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.google.common.base.Joiner;
-
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-
-public class WordCountJoin3ITCase extends JavaProgramTestBase {
-
- static final String[] WORDS_1 = new String[] {
- "hi there", "hi", "hi sue bob",
- "hi sue", "", "bob hi"};
-
- static final String[] WORDS_2 = new String[] {
- "hi tim", "beauty", "hooray sue bob",
- "hi there", "", "please say hi"};
-
- static final String[] WORDS_3 = new String[] {
- "hi stephan", "beauty", "hooray big fabian",
- "hi yo", "", "please say hi"};
-
- static final String[] RESULTS = new String[] {
- "beauty -> Tag1: Tag2: 1 Tag3: 1",
- "bob -> Tag1: 2 Tag2: 1 Tag3: ",
- "hi -> Tag1: 5 Tag2: 3 Tag3: 3",
- "hooray -> Tag1: Tag2: 1 Tag3: 1",
- "please -> Tag1: Tag2: 1 Tag3: 1",
- "say -> Tag1: Tag2: 1 Tag3: 1",
- "sue -> Tag1: 2 Tag2: 1 Tag3: ",
- "there -> Tag1: 1 Tag2: 1 Tag3: ",
- "tim -> Tag1: Tag2: 1 Tag3: ",
- "stephan -> Tag1: Tag2: Tag3: 1",
- "yo -> Tag1: Tag2: Tag3: 1",
- "fabian -> Tag1: Tag2: Tag3: 1",
- "big -> Tag1: Tag2: Tag3: 1"
- };
-
- static final TupleTag<Long> tag1 = new TupleTag<>("Tag1");
- static final TupleTag<Long> tag2 = new TupleTag<>("Tag2");
- static final TupleTag<Long> tag3 = new TupleTag<>("Tag3");
-
- protected String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
-
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- /* Create two PCollections and join them */
- PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1))
- .apply(ParDo.of(new ExtractWordsFn()))
- .apply(Count.<String>perElement());
-
- PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2))
- .apply(ParDo.of(new ExtractWordsFn()))
- .apply(Count.<String>perElement());
-
- PCollection<KV<String,Long>> occurences3 = p.apply(Create.of(WORDS_3))
- .apply(ParDo.of(new ExtractWordsFn()))
- .apply(Count.<String>perElement());
-
- /* CoGroup the two collections */
- PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple
- .of(tag1, occurences1)
- .and(tag2, occurences2)
- .and(tag3, occurences3)
- .apply(CoGroupByKey.<String>create());
-
- /* Format output */
- mergedOccurences.apply(ParDo.of(new FormatCountsFn()))
- .apply(TextIO.Write.named("test").to(resultPath));
-
- p.run();
- }
-
-
- static class ExtractWordsFn extends DoFn<String, String> {
-
- @Override
- public void startBundle(Context c) {
- }
-
- @Override
- public void processElement(ProcessContext c) {
- // Split the line into words.
- String[] words = c.element().split("[^a-zA-Z']+");
-
- // Output each word encountered into the output PCollection.
- for (String word : words) {
- if (!word.isEmpty()) {
- c.output(word);
- }
- }
- }
- }
-
- static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> {
- @Override
- public void processElement(ProcessContext c) {
- CoGbkResult value = c.element().getValue();
- String key = c.element().getKey();
- String countTag1 = tag1.getId() + ": ";
- String countTag2 = tag2.getId() + ": ";
- String countTag3 = tag3.getId() + ": ";
- for (Long count : value.getAll(tag1)) {
- countTag1 += count + " ";
- }
- for (Long count : value.getAll(tag2)) {
- countTag2 += count + " ";
- }
- for (Long count : value.getAll(tag3)) {
- countTag3 += count;
- }
- c.output(key + " -> " + countTag1 + countTag2 + countTag3);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
index c76af65..3e5a17d 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
@@ -44,6 +44,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -53,7 +54,7 @@ import java.util.Collection;
import java.util.Comparator;
import java.util.concurrent.ConcurrentLinkedQueue;
-public class GroupAlsoByWindowTest {
+public class GroupAlsoByWindowTest extends StreamingMultipleProgramsTestBase {
private final Combine.CombineFn combiner = new Sum.SumIntegerFn();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java
deleted file mode 100644
index e6b7f64..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.util;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.BigQueryIO;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.google.api.services.bigquery.model.TableRow;
-
-/**
- * Copied from {@link org.apache.beam.examples.JoinExamples} because the code
- * is private there.
- */
-public class JoinExamples {
-
- // A 1000-row sample of the GDELT data here: gdelt-bq:full.events.
- private static final String GDELT_EVENTS_TABLE =
- "clouddataflow-readonly:samples.gdelt_sample";
- // A table that maps country codes to country names.
- private static final String COUNTRY_CODES =
- "gdelt-bq:full.crosswalk_geocountrycodetohuman";
-
- /**
- * Join two collections, using country code as the key.
- */
- public static PCollection<String> joinEvents(PCollection<TableRow> eventsTable,
- PCollection<TableRow> countryCodes) throws Exception {
-
- final TupleTag<String> eventInfoTag = new TupleTag<>();
- final TupleTag<String> countryInfoTag = new TupleTag<>();
-
- // transform both input collections to tuple collections, where the keys are country
- // codes in both cases.
- PCollection<KV<String, String>> eventInfo = eventsTable.apply(
- ParDo.of(new ExtractEventDataFn()));
- PCollection<KV<String, String>> countryInfo = countryCodes.apply(
- ParDo.of(new ExtractCountryInfoFn()));
-
- // country code 'key' -> CGBKR (<event info>, <country name>)
- PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
- .of(eventInfoTag, eventInfo)
- .and(countryInfoTag, countryInfo)
- .apply(CoGroupByKey.<String>create());
-
- // Process the CoGbkResult elements generated by the CoGroupByKey transform.
- // country code 'key' -> string of <event info>, <country name>
- PCollection<KV<String, String>> finalResultCollection =
- kvpCollection.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
- @Override
- public void processElement(ProcessContext c) {
- KV<String, CoGbkResult> e = c.element();
- CoGbkResult val = e.getValue();
- String countryCode = e.getKey();
- String countryName;
- countryName = e.getValue().getOnly(countryInfoTag, "Kostas");
- for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) {
- // Generate a string that combines information from both collection values
- c.output(KV.of(countryCode, "Country name: " + countryName
- + ", Event info: " + eventInfo));
- }
- }
- }));
-
- // write to GCS
- return finalResultCollection
- .apply(ParDo.of(new DoFn<KV<String, String>, String>() {
- @Override
- public void processElement(ProcessContext c) {
- String outputstring = "Country code: " + c.element().getKey()
- + ", " + c.element().getValue();
- c.output(outputstring);
- }
- }));
- }
-
- /**
- * Examines each row (event) in the input table. Output a KV with the key the country
- * code of the event, and the value a string encoding event information.
- */
- static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> {
- @Override
- public void processElement(ProcessContext c) {
- TableRow row = c.element();
- String countryCode = (String) row.get("ActionGeo_CountryCode");
- String sqlDate = (String) row.get("SQLDATE");
- String actor1Name = (String) row.get("Actor1Name");
- String sourceUrl = (String) row.get("SOURCEURL");
- String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", url: " + sourceUrl;
- c.output(KV.of(countryCode, eventInfo));
- }
- }
-
-
- /**
- * Examines each row (country info) in the input table. Output a KV with the key the country
- * code, and the value the country name.
- */
- static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> {
- @Override
- public void processElement(ProcessContext c) {
- TableRow row = c.element();
- String countryCode = (String) row.get("FIPSCC");
- String countryName = (String) row.get("HumanName");
- c.output(KV.of(countryCode, countryName));
- }
- }
-
-
- /**
- * Options supported by {@link JoinExamples}.
- * <p>
- * Inherits standard configuration options.
- */
- private interface Options extends PipelineOptions {
- @Description("Path of the file to write to")
- @Validation.Required
- String getOutput();
- void setOutput(String value);
- }
-
- public static void main(String[] args) throws Exception {
- Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
- Pipeline p = Pipeline.create(options);
- // the following two 'applys' create multiple inputs to our pipeline, one for each
- // of our two input sources.
- PCollection<TableRow> eventsTable = p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE));
- PCollection<TableRow> countryCodes = p.apply(BigQueryIO.Read.from(COUNTRY_CODES));
- PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes);
- formattedResults.apply(TextIO.Write.to(options.getOutput()));
- p.run();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
index 2ca7014..29240e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
@@ -35,7 +35,7 @@ import java.util.List;
/**
* A UnionCoder encodes RawUnionValues.
*/
-class UnionCoder extends StandardCoder<RawUnionValue> {
+public class UnionCoder extends StandardCoder<RawUnionValue> {
// TODO: Think about how to integrate this with a schema object (i.e.
// a tuple of tuple tags).
/**