You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:36 UTC
[41/50] [abbrv] incubator-beam git commit: [flink] convert tabs to 2
spaces
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
index 1525c80..9f602fd 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
@@ -25,103 +25,103 @@ import java.util.concurrent.TimeUnit;
public class StateCheckpointWriter {
- private final AbstractStateBackend.CheckpointStateOutputView output;
-
- public static StateCheckpointWriter create(AbstractStateBackend.CheckpointStateOutputView output) {
- return new StateCheckpointWriter(output);
- }
-
- private StateCheckpointWriter(AbstractStateBackend.CheckpointStateOutputView output) {
- this.output = output;
- }
-
- ///////// Creating the serialized versions of the different types of state held by dataflow ///////
-
- public StateCheckpointWriter addValueBuilder() throws IOException {
- validate();
- StateType.serialize(StateType.VALUE, this);
- return this;
- }
-
- public StateCheckpointWriter addWatermarkHoldsBuilder() throws IOException {
- validate();
- StateType.serialize(StateType.WATERMARK, this);
- return this;
- }
-
- public StateCheckpointWriter addListUpdatesBuilder() throws IOException {
- validate();
- StateType.serialize(StateType.LIST, this);
- return this;
- }
-
- public StateCheckpointWriter addAccumulatorBuilder() throws IOException {
- validate();
- StateType.serialize(StateType.ACCUMULATOR, this);
- return this;
- }
-
- ///////// Setting the tag for a given state element ///////
-
- public StateCheckpointWriter setTag(ByteString stateKey) throws IOException {
- return writeData(stateKey.toByteArray());
- }
-
- public StateCheckpointWriter setTag(String stateKey) throws IOException {
- output.writeUTF(stateKey);
- return this;
- }
-
-
- public <K> StateCheckpointWriter serializeKey(K key, CoderTypeSerializer<K> keySerializer) throws IOException {
- return serializeObject(key, keySerializer);
- }
-
- public <T> StateCheckpointWriter serializeObject(T object, CoderTypeSerializer<T> objectSerializer) throws IOException {
- objectSerializer.serialize(object, output);
- return this;
- }
-
- ///////// Write the actual serialized data //////////
-
- public StateCheckpointWriter setData(ByteString data) throws IOException {
- return writeData(data.toByteArray());
- }
-
- public StateCheckpointWriter setData(byte[] data) throws IOException {
- return writeData(data);
- }
-
- public StateCheckpointWriter setTimestamp(Instant timestamp) throws IOException {
- validate();
- output.writeLong(TimeUnit.MILLISECONDS.toMicros(timestamp.getMillis()));
- return this;
- }
-
- public StateCheckpointWriter writeInt(int number) throws IOException {
- validate();
- output.writeInt(number);
- return this;
- }
-
- public StateCheckpointWriter writeByte(byte b) throws IOException {
- validate();
- output.writeByte(b);
- return this;
- }
-
- ///////// Helper Methods ///////
-
- private StateCheckpointWriter writeData(byte[] data) throws IOException {
- validate();
- output.writeInt(data.length);
- output.write(data);
- return this;
- }
-
- private void validate() {
- if (this.output == null) {
- throw new RuntimeException("StateBackend not initialized yet.");
- }
- }
+ private final AbstractStateBackend.CheckpointStateOutputView output;
+
+ public static StateCheckpointWriter create(AbstractStateBackend.CheckpointStateOutputView output) {
+ return new StateCheckpointWriter(output);
+ }
+
+ private StateCheckpointWriter(AbstractStateBackend.CheckpointStateOutputView output) {
+ this.output = output;
+ }
+
+ ///////// Creating the serialized versions of the different types of state held by dataflow ///////
+
+ public StateCheckpointWriter addValueBuilder() throws IOException {
+ validate();
+ StateType.serialize(StateType.VALUE, this);
+ return this;
+ }
+
+ public StateCheckpointWriter addWatermarkHoldsBuilder() throws IOException {
+ validate();
+ StateType.serialize(StateType.WATERMARK, this);
+ return this;
+ }
+
+ public StateCheckpointWriter addListUpdatesBuilder() throws IOException {
+ validate();
+ StateType.serialize(StateType.LIST, this);
+ return this;
+ }
+
+ public StateCheckpointWriter addAccumulatorBuilder() throws IOException {
+ validate();
+ StateType.serialize(StateType.ACCUMULATOR, this);
+ return this;
+ }
+
+ ///////// Setting the tag for a given state element ///////
+
+ public StateCheckpointWriter setTag(ByteString stateKey) throws IOException {
+ return writeData(stateKey.toByteArray());
+ }
+
+ public StateCheckpointWriter setTag(String stateKey) throws IOException {
+ output.writeUTF(stateKey);
+ return this;
+ }
+
+
+ public <K> StateCheckpointWriter serializeKey(K key, CoderTypeSerializer<K> keySerializer) throws IOException {
+ return serializeObject(key, keySerializer);
+ }
+
+ public <T> StateCheckpointWriter serializeObject(T object, CoderTypeSerializer<T> objectSerializer) throws IOException {
+ objectSerializer.serialize(object, output);
+ return this;
+ }
+
+ ///////// Write the actual serialized data //////////
+
+ public StateCheckpointWriter setData(ByteString data) throws IOException {
+ return writeData(data.toByteArray());
+ }
+
+ public StateCheckpointWriter setData(byte[] data) throws IOException {
+ return writeData(data);
+ }
+
+ public StateCheckpointWriter setTimestamp(Instant timestamp) throws IOException {
+ validate();
+ output.writeLong(TimeUnit.MILLISECONDS.toMicros(timestamp.getMillis()));
+ return this;
+ }
+
+ public StateCheckpointWriter writeInt(int number) throws IOException {
+ validate();
+ output.writeInt(number);
+ return this;
+ }
+
+ public StateCheckpointWriter writeByte(byte b) throws IOException {
+ validate();
+ output.writeByte(b);
+ return this;
+ }
+
+ ///////// Helper Methods ///////
+
+ private StateCheckpointWriter writeData(byte[] data) throws IOException {
+ validate();
+ output.writeInt(data.length);
+ output.write(data);
+ return this;
+ }
+
+ private void validate() {
+ if (this.output == null) {
+ throw new RuntimeException("StateBackend not initialized yet.");
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
index aa049ef..9e2c9f8 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
@@ -23,49 +23,49 @@ import java.io.IOException;
* */
public enum StateType {
- VALUE(0),
+ VALUE(0),
- WATERMARK(1),
+ WATERMARK(1),
- LIST(2),
+ LIST(2),
- ACCUMULATOR(3);
+ ACCUMULATOR(3);
- private final int numVal;
+ private final int numVal;
- StateType(int value) {
- this.numVal = value;
- }
+ StateType(int value) {
+ this.numVal = value;
+ }
- public static void serialize(StateType type, StateCheckpointWriter output) throws IOException {
- if (output == null) {
- throw new IllegalArgumentException("Cannot write to a null output.");
- }
+ public static void serialize(StateType type, StateCheckpointWriter output) throws IOException {
+ if (output == null) {
+ throw new IllegalArgumentException("Cannot write to a null output.");
+ }
- if(type.numVal < 0 || type.numVal > 3) {
- throw new RuntimeException("Unknown State Type " + type + ".");
- }
+ if(type.numVal < 0 || type.numVal > 3) {
+ throw new RuntimeException("Unknown State Type " + type + ".");
+ }
- output.writeByte((byte) type.numVal);
- }
+ output.writeByte((byte) type.numVal);
+ }
- public static StateType deserialize(StateCheckpointReader input) throws IOException {
- if (input == null) {
- throw new IllegalArgumentException("Cannot read from a null input.");
- }
+ public static StateType deserialize(StateCheckpointReader input) throws IOException {
+ if (input == null) {
+ throw new IllegalArgumentException("Cannot read from a null input.");
+ }
- int typeInt = (int) input.getByte();
- if(typeInt < 0 || typeInt > 3) {
- throw new RuntimeException("Unknown State Type " + typeInt + ".");
- }
+ int typeInt = (int) input.getByte();
+ if(typeInt < 0 || typeInt > 3) {
+ throw new RuntimeException("Unknown State Type " + typeInt + ".");
+ }
- StateType resultType = null;
- for(StateType st: values()) {
- if(st.numVal == typeInt) {
- resultType = st;
- break;
- }
- }
- return resultType;
- }
+ StateType resultType = null;
+ for(StateType st: values()) {
+ if(st.numVal == typeInt) {
+ resultType = st;
+ break;
+ }
+ }
+ return resultType;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
index ce53d44..3272975 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
@@ -30,70 +30,70 @@ import org.apache.flink.test.util.JavaProgramTestBase;
public class AvroITCase extends JavaProgramTestBase {
- protected String resultPath;
- protected String tmpPath;
-
- public AvroITCase(){
- }
-
- static final String[] EXPECTED_RESULT = new String[] {
- "Joe red 3",
- "Mary blue 4",
- "Mark green 1",
- "Julia purple 5"
- };
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- tmpPath = getTempDirPath("tmp");
-
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- runProgram(tmpPath, resultPath);
- }
-
- private static void runProgram(String tmpPath, String resultPath) {
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- p
- .apply(Create.of(
- new User("Joe", 3, "red"),
- new User("Mary", 4, "blue"),
- new User("Mark", 1, "green"),
- new User("Julia", 5, "purple"))
- .withCoder(AvroCoder.of(User.class)))
-
- .apply(AvroIO.Write.to(tmpPath)
- .withSchema(User.class));
-
- p.run();
-
- p = FlinkTestPipeline.createForBatch();
-
- p
- .apply(AvroIO.Read.from(tmpPath).withSchema(User.class).withoutValidation())
-
- .apply(ParDo.of(new DoFn<User, String>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- User u = c.element();
- String result = u.getName() + " " + u.getFavoriteColor() + " " + u.getFavoriteNumber();
- c.output(result);
- }
- }))
-
- .apply(TextIO.Write.to(resultPath));
-
- p.run();
- }
+ protected String resultPath;
+ protected String tmpPath;
+
+ public AvroITCase(){
+ }
+
+ static final String[] EXPECTED_RESULT = new String[] {
+ "Joe red 3",
+ "Mary blue 4",
+ "Mark green 1",
+ "Julia purple 5"
+ };
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ tmpPath = getTempDirPath("tmp");
+
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ runProgram(tmpPath, resultPath);
+ }
+
+ private static void runProgram(String tmpPath, String resultPath) {
+ Pipeline p = FlinkTestPipeline.createForBatch();
+
+ p
+ .apply(Create.of(
+ new User("Joe", 3, "red"),
+ new User("Mary", 4, "blue"),
+ new User("Mark", 1, "green"),
+ new User("Julia", 5, "purple"))
+ .withCoder(AvroCoder.of(User.class)))
+
+ .apply(AvroIO.Write.to(tmpPath)
+ .withSchema(User.class));
+
+ p.run();
+
+ p = FlinkTestPipeline.createForBatch();
+
+ p
+ .apply(AvroIO.Read.from(tmpPath).withSchema(User.class).withoutValidation())
+
+ .apply(ParDo.of(new DoFn<User, String>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ User u = c.element();
+ String result = u.getName() + " " + u.getFavoriteColor() + " " + u.getFavoriteNumber();
+ c.output(result);
+ }
+ }))
+
+ .apply(TextIO.Write.to(resultPath));
+
+ p.run();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
index 928388c..e65e497 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
@@ -26,47 +26,47 @@ import org.apache.flink.test.util.JavaProgramTestBase;
public class FlattenizeITCase extends JavaProgramTestBase {
- private String resultPath;
- private String resultPath2;
+ private String resultPath;
+ private String resultPath2;
- private static final String[] words = {"hello", "this", "is", "a", "DataSet!"};
- private static final String[] words2 = {"hello", "this", "is", "another", "DataSet!"};
- private static final String[] words3 = {"hello", "this", "is", "yet", "another", "DataSet!"};
+ private static final String[] words = {"hello", "this", "is", "a", "DataSet!"};
+ private static final String[] words2 = {"hello", "this", "is", "another", "DataSet!"};
+ private static final String[] words3 = {"hello", "this", "is", "yet", "another", "DataSet!"};
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- resultPath2 = getTempDirPath("result2");
- }
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ resultPath2 = getTempDirPath("result2");
+ }
- @Override
- protected void postSubmit() throws Exception {
- String join = Joiner.on('\n').join(words);
- String join2 = Joiner.on('\n').join(words2);
- String join3 = Joiner.on('\n').join(words3);
- compareResultsByLinesInMemory(join + "\n" + join2, resultPath);
- compareResultsByLinesInMemory(join + "\n" + join2 + "\n" + join3, resultPath2);
- }
+ @Override
+ protected void postSubmit() throws Exception {
+ String join = Joiner.on('\n').join(words);
+ String join2 = Joiner.on('\n').join(words2);
+ String join3 = Joiner.on('\n').join(words3);
+ compareResultsByLinesInMemory(join + "\n" + join2, resultPath);
+ compareResultsByLinesInMemory(join + "\n" + join2 + "\n" + join3, resultPath2);
+ }
- @Override
- protected void testProgram() throws Exception {
- Pipeline p = FlinkTestPipeline.createForBatch();
+ @Override
+ protected void testProgram() throws Exception {
+ Pipeline p = FlinkTestPipeline.createForBatch();
- PCollection<String> p1 = p.apply(Create.of(words));
- PCollection<String> p2 = p.apply(Create.of(words2));
+ PCollection<String> p1 = p.apply(Create.of(words));
+ PCollection<String> p2 = p.apply(Create.of(words2));
- PCollectionList<String> list = PCollectionList.of(p1).and(p2);
+ PCollectionList<String> list = PCollectionList.of(p1).and(p2);
- list.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath));
+ list.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath));
- PCollection<String> p3 = p.apply(Create.of(words3));
+ PCollection<String> p3 = p.apply(Create.of(words3));
- PCollectionList<String> list2 = list.and(p3);
+ PCollectionList<String> list2 = list.and(p3);
- list2.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath2));
+ list2.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath2));
- p.run();
- }
+ p.run();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
index 59c3b69..578e0e1 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
@@ -26,45 +26,45 @@ import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
*/
public class FlinkTestPipeline extends Pipeline {
- /**
- * Creates and returns a new test pipeline for batch execution.
- *
- * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
- * {@link Pipeline#run} to execute the pipeline and check the tests.
- */
- public static FlinkTestPipeline createForBatch() {
- return create(false);
- }
+ /**
+ * Creates and returns a new test pipeline for batch execution.
+ *
+ * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
+ * {@link Pipeline#run} to execute the pipeline and check the tests.
+ */
+ public static FlinkTestPipeline createForBatch() {
+ return create(false);
+ }
- /**
- * Creates and returns a new test pipeline for streaming execution.
- *
- * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
- * {@link Pipeline#run} to execute the pipeline and check the tests.
- *
- * @return The Test Pipeline
- */
- public static FlinkTestPipeline createForStreaming() {
- return create(true);
- }
+ /**
+ * Creates and returns a new test pipeline for streaming execution.
+ *
+ * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
+ * {@link Pipeline#run} to execute the pipeline and check the tests.
+ *
+ * @return The Test Pipeline
+ */
+ public static FlinkTestPipeline createForStreaming() {
+ return create(true);
+ }
- /**
- * Creates and returns a new test pipeline for streaming or batch execution.
- *
- * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
- * {@link Pipeline#run} to execute the pipeline and check the tests.
- *
- * @param streaming <code>True</code> for streaming mode, <code>False</code> for batch.
- * @return The Test Pipeline.
- */
- private static FlinkTestPipeline create(boolean streaming) {
- FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest(streaming);
- return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions());
- }
+ /**
+ * Creates and returns a new test pipeline for streaming or batch execution.
+ *
+ * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
+ * {@link Pipeline#run} to execute the pipeline and check the tests.
+ *
+ * @param streaming <code>True</code> for streaming mode, <code>False</code> for batch.
+ * @return The Test Pipeline.
+ */
+ private static FlinkTestPipeline create(boolean streaming) {
+ FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest(streaming);
+ return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions());
+ }
- private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner,
- PipelineOptions options) {
- super(runner, options);
- }
+ private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner,
+ PipelineOptions options) {
+ super(runner, options);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
index af0f217..28861ea 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
@@ -34,66 +34,66 @@ import java.util.List;
*/
public class JoinExamplesITCase extends JavaProgramTestBase {
- protected String resultPath;
-
- public JoinExamplesITCase(){
- }
-
- private static final TableRow row1 = new TableRow()
- .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212")
- .set("Actor1Name", "BANGKOK").set("SOURCEURL", "http://cnn.com");
- private static final TableRow row2 = new TableRow()
- .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212")
- .set("Actor1Name", "LAOS").set("SOURCEURL", "http://www.chicagotribune.com");
- private static final TableRow row3 = new TableRow()
- .set("ActionGeo_CountryCode", "BE").set("SQLDATE", "20141213")
- .set("Actor1Name", "AFGHANISTAN").set("SOURCEURL", "http://cnn.com");
- static final TableRow[] EVENTS = new TableRow[] {
- row1, row2, row3
- };
- static final List<TableRow> EVENT_ARRAY = Arrays.asList(EVENTS);
-
- private static final TableRow cc1 = new TableRow()
- .set("FIPSCC", "VM").set("HumanName", "Vietnam");
- private static final TableRow cc2 = new TableRow()
- .set("FIPSCC", "BE").set("HumanName", "Belgium");
- static final TableRow[] CCS = new TableRow[] {
- cc1, cc2
- };
- static final List<TableRow> CC_ARRAY = Arrays.asList(CCS);
-
- static final String[] JOINED_EVENTS = new String[] {
- "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: LAOS, "
- + "url: http://www.chicagotribune.com",
- "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: BANGKOK, "
- + "url: http://cnn.com",
- "Country code: BE, Country name: Belgium, Event info: Date: 20141213, Actor1: AFGHANISTAN, "
- + "url: http://cnn.com"
- };
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(JOINED_EVENTS), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
-
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- PCollection<TableRow> input1 = p.apply(Create.of(EVENT_ARRAY));
- PCollection<TableRow> input2 = p.apply(Create.of(CC_ARRAY));
-
- PCollection<String> output = JoinExamples.joinEvents(input1, input2);
-
- output.apply(TextIO.Write.to(resultPath));
-
- p.run();
- }
+ protected String resultPath;
+
+ public JoinExamplesITCase(){
+ }
+
+ private static final TableRow row1 = new TableRow()
+ .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212")
+ .set("Actor1Name", "BANGKOK").set("SOURCEURL", "http://cnn.com");
+ private static final TableRow row2 = new TableRow()
+ .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212")
+ .set("Actor1Name", "LAOS").set("SOURCEURL", "http://www.chicagotribune.com");
+ private static final TableRow row3 = new TableRow()
+ .set("ActionGeo_CountryCode", "BE").set("SQLDATE", "20141213")
+ .set("Actor1Name", "AFGHANISTAN").set("SOURCEURL", "http://cnn.com");
+ static final TableRow[] EVENTS = new TableRow[] {
+ row1, row2, row3
+ };
+ static final List<TableRow> EVENT_ARRAY = Arrays.asList(EVENTS);
+
+ private static final TableRow cc1 = new TableRow()
+ .set("FIPSCC", "VM").set("HumanName", "Vietnam");
+ private static final TableRow cc2 = new TableRow()
+ .set("FIPSCC", "BE").set("HumanName", "Belgium");
+ static final TableRow[] CCS = new TableRow[] {
+ cc1, cc2
+ };
+ static final List<TableRow> CC_ARRAY = Arrays.asList(CCS);
+
+ static final String[] JOINED_EVENTS = new String[] {
+ "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: LAOS, "
+ + "url: http://www.chicagotribune.com",
+ "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: BANGKOK, "
+ + "url: http://cnn.com",
+ "Country code: BE, Country name: Belgium, Event info: Date: 20141213, Actor1: AFGHANISTAN, "
+ + "url: http://cnn.com"
+ };
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(JOINED_EVENTS), resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+
+ Pipeline p = FlinkTestPipeline.createForBatch();
+
+ PCollection<TableRow> input1 = p.apply(Create.of(EVENT_ARRAY));
+ PCollection<TableRow> input2 = p.apply(Create.of(CC_ARRAY));
+
+ PCollection<String> output = JoinExamples.joinEvents(input1, input2);
+
+ output.apply(TextIO.Write.to(resultPath));
+
+ p.run();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
index 35f2eaf..d1652e7 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
@@ -27,37 +27,37 @@ import java.io.Serializable;
public class MaybeEmptyTestITCase extends JavaProgramTestBase implements Serializable {
- protected String resultPath;
-
- protected final String expected = "test";
-
- public MaybeEmptyTestITCase() {
- }
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(expected, resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
-
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- p.apply(Create.of((Void) null)).setCoder(VoidCoder.of())
- .apply(ParDo.of(
- new DoFn<Void, String>() {
- @Override
- public void processElement(DoFn<Void, String>.ProcessContext c) {
- c.output(expected);
- }
- })).apply(TextIO.Write.to(resultPath));
- p.run();
- }
+ protected String resultPath;
+
+ protected final String expected = "test";
+
+ public MaybeEmptyTestITCase() {
+ }
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(expected, resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+
+ Pipeline p = FlinkTestPipeline.createForBatch();
+
+ p.apply(Create.of((Void) null)).setCoder(VoidCoder.of())
+ .apply(ParDo.of(
+ new DoFn<Void, String>() {
+ @Override
+ public void processElement(DoFn<Void, String>.ProcessContext c) {
+ c.output(expected);
+ }
+ })).apply(TextIO.Write.to(resultPath));
+ p.run();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
index ccdbbf9..d8087d6 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
@@ -31,68 +31,68 @@ import java.io.Serializable;
public class ParDoMultiOutputITCase extends JavaProgramTestBase implements Serializable {
- private String resultPath;
-
- private static String[] expectedWords = {"MAAA", "MAAFOOO"};
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on("\n").join(expectedWords), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- PCollection<String> words = p.apply(Create.of("Hello", "Whatupmyman", "hey", "SPECIALthere", "MAAA", "MAAFOOO"));
-
- // Select words whose length is below a cut off,
- // plus the lengths of words that are above the cut off.
- // Also select words starting with "MARKER".
- final int wordLengthCutOff = 3;
- // Create tags to use for the main and side outputs.
- final TupleTag<String> wordsBelowCutOffTag = new TupleTag<String>(){};
- final TupleTag<Integer> wordLengthsAboveCutOffTag = new TupleTag<Integer>(){};
- final TupleTag<String> markedWordsTag = new TupleTag<String>(){};
-
- PCollectionTuple results =
- words.apply(ParDo
- .withOutputTags(wordsBelowCutOffTag, TupleTagList.of(wordLengthsAboveCutOffTag)
- .and(markedWordsTag))
- .of(new DoFn<String, String>() {
- final TupleTag<String> specialWordsTag = new TupleTag<String>() {
- };
-
- public void processElement(ProcessContext c) {
- String word = c.element();
- if (word.length() <= wordLengthCutOff) {
- c.output(word);
- } else {
- c.sideOutput(wordLengthsAboveCutOffTag, word.length());
- }
- if (word.startsWith("MAA")) {
- c.sideOutput(markedWordsTag, word);
- }
-
- if (word.startsWith("SPECIAL")) {
- c.sideOutput(specialWordsTag, word);
- }
- }
- }));
-
- // Extract the PCollection results, by tag.
- PCollection<String> wordsBelowCutOff = results.get(wordsBelowCutOffTag);
- PCollection<Integer> wordLengthsAboveCutOff = results.get
- (wordLengthsAboveCutOffTag);
- PCollection<String> markedWords = results.get(markedWordsTag);
-
- markedWords.apply(TextIO.Write.to(resultPath));
-
- p.run();
- }
+ private String resultPath;
+
+ private static String[] expectedWords = {"MAAA", "MAAFOOO"};
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on("\n").join(expectedWords), resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ Pipeline p = FlinkTestPipeline.createForBatch();
+
+ PCollection<String> words = p.apply(Create.of("Hello", "Whatupmyman", "hey", "SPECIALthere", "MAAA", "MAAFOOO"));
+
+ // Select words whose length is below a cut off,
+ // plus the lengths of words that are above the cut off.
+ // Also select words starting with "MARKER".
+ final int wordLengthCutOff = 3;
+ // Create tags to use for the main and side outputs.
+ final TupleTag<String> wordsBelowCutOffTag = new TupleTag<String>(){};
+ final TupleTag<Integer> wordLengthsAboveCutOffTag = new TupleTag<Integer>(){};
+ final TupleTag<String> markedWordsTag = new TupleTag<String>(){};
+
+ PCollectionTuple results =
+ words.apply(ParDo
+ .withOutputTags(wordsBelowCutOffTag, TupleTagList.of(wordLengthsAboveCutOffTag)
+ .and(markedWordsTag))
+ .of(new DoFn<String, String>() {
+ final TupleTag<String> specialWordsTag = new TupleTag<String>() {
+ };
+
+ public void processElement(ProcessContext c) {
+ String word = c.element();
+ if (word.length() <= wordLengthCutOff) {
+ c.output(word);
+ } else {
+ c.sideOutput(wordLengthsAboveCutOffTag, word.length());
+ }
+ if (word.startsWith("MAA")) {
+ c.sideOutput(markedWordsTag, word);
+ }
+
+ if (word.startsWith("SPECIAL")) {
+ c.sideOutput(specialWordsTag, word);
+ }
+ }
+ }));
+
+ // Extract the PCollection results, by tag.
+ PCollection<String> wordsBelowCutOff = results.get(wordsBelowCutOffTag);
+ PCollection<Integer> wordLengthsAboveCutOff = results.get
+ (wordLengthsAboveCutOffTag);
+ PCollection<String> markedWords = results.get(markedWordsTag);
+
+ markedWords.apply(TextIO.Write.to(resultPath));
+
+ p.run();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
index 3569a78..5a46359 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
@@ -36,128 +36,128 @@ import java.util.List;
public class ReadSourceITCase extends JavaProgramTestBase {
- protected String resultPath;
-
- public ReadSourceITCase(){
- }
-
- static final String[] EXPECTED_RESULT = new String[] {
- "1", "2", "3", "4", "5", "6", "7", "8", "9"};
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- runProgram(resultPath);
- }
-
- private static void runProgram(String resultPath) {
-
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- PCollection<String> result = p
- .apply(Read.from(new ReadSource(1, 10)))
- .apply(ParDo.of(new DoFn<Integer, String>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- c.output(c.element().toString());
- }
- }));
-
- result.apply(TextIO.Write.to(resultPath));
- p.run();
- }
-
-
- private static class ReadSource extends BoundedSource<Integer> {
- final int from;
- final int to;
-
- ReadSource(int from, int to) {
- this.from = from;
- this.to = to;
- }
-
- @Override
- public List<ReadSource> splitIntoBundles(long desiredShardSizeBytes, PipelineOptions options)
- throws Exception {
- List<ReadSource> res = new ArrayList<>();
- FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class);
- int numWorkers = flinkOptions.getParallelism();
- Preconditions.checkArgument(numWorkers > 0, "Number of workers should be larger than 0.");
-
- float step = 1.0f * (to - from) / numWorkers;
- for (int i = 0; i < numWorkers; ++i) {
- res.add(new ReadSource(Math.round(from + i * step), Math.round(from + (i + 1) * step)));
- }
- return res;
- }
-
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
- return 8 * (to - from);
- }
-
- @Override
- public boolean producesSortedKeys(PipelineOptions options) throws Exception {
- return true;
- }
-
- @Override
- public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException {
- return new RangeReader(this);
- }
-
- @Override
- public void validate() {}
-
- @Override
- public Coder<Integer> getDefaultOutputCoder() {
- return BigEndianIntegerCoder.of();
- }
-
- private class RangeReader extends BoundedReader<Integer> {
- private int current;
-
- public RangeReader(ReadSource source) {
- this.current = source.from - 1;
- }
-
- @Override
- public boolean start() throws IOException {
- return true;
- }
-
- @Override
- public boolean advance() throws IOException {
- current++;
- return (current < to);
- }
-
- @Override
- public Integer getCurrent() {
- return current;
- }
-
- @Override
- public void close() throws IOException {
- // Nothing
- }
-
- @Override
- public BoundedSource<Integer> getCurrentSource() {
- return ReadSource.this;
- }
- }
- }
+ protected String resultPath;
+
+ public ReadSourceITCase(){
+ }
+
+ static final String[] EXPECTED_RESULT = new String[] {
+ "1", "2", "3", "4", "5", "6", "7", "8", "9"};
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ runProgram(resultPath);
+ }
+
+ private static void runProgram(String resultPath) {
+
+ Pipeline p = FlinkTestPipeline.createForBatch();
+
+ PCollection<String> result = p
+ .apply(Read.from(new ReadSource(1, 10)))
+ .apply(ParDo.of(new DoFn<Integer, String>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(c.element().toString());
+ }
+ }));
+
+ result.apply(TextIO.Write.to(resultPath));
+ p.run();
+ }
+
+
+ private static class ReadSource extends BoundedSource<Integer> {
+ final int from;
+ final int to;
+
+ ReadSource(int from, int to) {
+ this.from = from;
+ this.to = to;
+ }
+
+ @Override
+ public List<ReadSource> splitIntoBundles(long desiredShardSizeBytes, PipelineOptions options)
+ throws Exception {
+ List<ReadSource> res = new ArrayList<>();
+ FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class);
+ int numWorkers = flinkOptions.getParallelism();
+ Preconditions.checkArgument(numWorkers > 0, "Number of workers should be larger than 0.");
+
+ float step = 1.0f * (to - from) / numWorkers;
+ for (int i = 0; i < numWorkers; ++i) {
+ res.add(new ReadSource(Math.round(from + i * step), Math.round(from + (i + 1) * step)));
+ }
+ return res;
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ return 8 * (to - from);
+ }
+
+ @Override
+ public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+ return true;
+ }
+
+ @Override
+ public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException {
+ return new RangeReader(this);
+ }
+
+ @Override
+ public void validate() {}
+
+ @Override
+ public Coder<Integer> getDefaultOutputCoder() {
+ return BigEndianIntegerCoder.of();
+ }
+
+ private class RangeReader extends BoundedReader<Integer> {
+ private int current;
+
+ public RangeReader(ReadSource source) {
+ this.current = source.from - 1;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return true;
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ current++;
+ return (current < to);
+ }
+
+ @Override
+ public Integer getCurrent() {
+ return current;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Nothing
+ }
+
+ @Override
+ public BoundedSource<Integer> getCurrentSource() {
+ return ReadSource.this;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
index db794f7..615f194 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
@@ -30,39 +30,39 @@ import java.util.List;
public class RemoveDuplicatesEmptyITCase extends JavaProgramTestBase {
- protected String resultPath;
+ protected String resultPath;
- public RemoveDuplicatesEmptyITCase(){
- }
+ public RemoveDuplicatesEmptyITCase(){
+ }
- static final String[] EXPECTED_RESULT = new String[] {};
+ static final String[] EXPECTED_RESULT = new String[] {};
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+ }
- @Override
- protected void testProgram() throws Exception {
+ @Override
+ protected void testProgram() throws Exception {
- List<String> strings = Collections.emptyList();
+ List<String> strings = Collections.emptyList();
- Pipeline p = FlinkTestPipeline.createForBatch();
+ Pipeline p = FlinkTestPipeline.createForBatch();
- PCollection<String> input =
- p.apply(Create.of(strings))
- .setCoder(StringUtf8Coder.of());
+ PCollection<String> input =
+ p.apply(Create.of(strings))
+ .setCoder(StringUtf8Coder.of());
- PCollection<String> output =
- input.apply(RemoveDuplicates.<String>create());
+ PCollection<String> output =
+ input.apply(RemoveDuplicates.<String>create());
- output.apply(TextIO.Write.to(resultPath));
- p.run();
- }
+ output.apply(TextIO.Write.to(resultPath));
+ p.run();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
index 04e06b8..8c19f2c 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
@@ -30,40 +30,40 @@ import java.util.List;
public class RemoveDuplicatesITCase extends JavaProgramTestBase {
- protected String resultPath;
+ protected String resultPath;
- public RemoveDuplicatesITCase(){
- }
+ public RemoveDuplicatesITCase(){
+ }
- static final String[] EXPECTED_RESULT = new String[] {
- "k1", "k5", "k2", "k3"};
+ static final String[] EXPECTED_RESULT = new String[] {
+ "k1", "k5", "k2", "k3"};
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+ }
- @Override
- protected void testProgram() throws Exception {
+ @Override
+ protected void testProgram() throws Exception {
- List<String> strings = Arrays.asList("k1", "k5", "k5", "k2", "k1", "k2", "k3");
+ List<String> strings = Arrays.asList("k1", "k5", "k5", "k2", "k1", "k2", "k3");
- Pipeline p = FlinkTestPipeline.createForBatch();
+ Pipeline p = FlinkTestPipeline.createForBatch();
- PCollection<String> input =
- p.apply(Create.of(strings))
- .setCoder(StringUtf8Coder.of());
+ PCollection<String> input =
+ p.apply(Create.of(strings))
+ .setCoder(StringUtf8Coder.of());
- PCollection<String> output =
- input.apply(RemoveDuplicates.<String>create());
+ PCollection<String> output =
+ input.apply(RemoveDuplicates.<String>create());
- output.apply(TextIO.Write.to(resultPath));
- p.run();
- }
+ output.apply(TextIO.Write.to(resultPath));
+ p.run();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
index ee8843c..7c3d6f9 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
@@ -28,40 +28,40 @@ import java.io.Serializable;
public class SideInputITCase extends JavaProgramTestBase implements Serializable {
- private static final String expected = "Hello!";
+ private static final String expected = "Hello!";
- protected String resultPath;
+ protected String resultPath;
- @Override
- protected void testProgram() throws Exception {
+ @Override
+ protected void testProgram() throws Exception {
- Pipeline p = FlinkTestPipeline.createForBatch();
+ Pipeline p = FlinkTestPipeline.createForBatch();
- final PCollectionView<String> sidesInput = p
- .apply(Create.of(expected))
- .apply(View.<String>asSingleton());
+ final PCollectionView<String> sidesInput = p
+ .apply(Create.of(expected))
+ .apply(View.<String>asSingleton());
- p.apply(Create.of("bli"))
- .apply(ParDo.of(new DoFn<String, String>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- String s = c.sideInput(sidesInput);
- c.output(s);
- }
- }).withSideInputs(sidesInput)).apply(TextIO.Write.to(resultPath));
+ p.apply(Create.of("bli"))
+ .apply(ParDo.of(new DoFn<String, String>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ String s = c.sideInput(sidesInput);
+ c.output(s);
+ }
+ }).withSideInputs(sidesInput)).apply(TextIO.Write.to(resultPath));
- p.run();
- }
+ p.run();
+ }
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(expected, resultPath);
- }
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(expected, resultPath);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
index 07c1294..715d0be 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
@@ -32,45 +32,45 @@ import java.net.URI;
public class TfIdfITCase extends JavaProgramTestBase {
- protected String resultPath;
+ protected String resultPath;
- public TfIdfITCase(){
- }
+ public TfIdfITCase(){
+ }
- static final String[] EXPECTED_RESULT = new String[] {
- "a", "m", "n", "b", "c", "d"};
+ static final String[] EXPECTED_RESULT = new String[] {
+ "a", "m", "n", "b", "c", "d"};
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+ }
- @Override
- protected void testProgram() throws Exception {
+ @Override
+ protected void testProgram() throws Exception {
- Pipeline pipeline = FlinkTestPipeline.createForBatch();
+ Pipeline pipeline = FlinkTestPipeline.createForBatch();
- pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
+ pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
- PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline
- .apply(Create.of(
- KV.of(new URI("x"), "a b c d"),
- KV.of(new URI("y"), "a b c"),
- KV.of(new URI("z"), "a m n")))
- .apply(new TFIDF.ComputeTfIdf());
+ PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline
+ .apply(Create.of(
+ KV.of(new URI("x"), "a b c d"),
+ KV.of(new URI("y"), "a b c"),
+ KV.of(new URI("z"), "a m n")))
+ .apply(new TFIDF.ComputeTfIdf());
- PCollection<String> words = wordToUriAndTfIdf
- .apply(Keys.<String>create())
- .apply(RemoveDuplicates.<String>create());
+ PCollection<String> words = wordToUriAndTfIdf
+ .apply(Keys.<String>create())
+ .apply(RemoveDuplicates.<String>create());
- words.apply(TextIO.Write.to(resultPath));
+ words.apply(TextIO.Write.to(resultPath));
- pipeline.run();
- }
+ pipeline.run();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
index 9188097..f1a2454 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
@@ -32,43 +32,43 @@ import java.util.List;
public class WordCountITCase extends JavaProgramTestBase {
- protected String resultPath;
+ protected String resultPath;
- public WordCountITCase(){
- }
+ public WordCountITCase(){
+ }
- static final String[] WORDS_ARRAY = new String[] {
- "hi there", "hi", "hi sue bob",
- "hi sue", "", "bob hi"};
+ static final String[] WORDS_ARRAY = new String[] {
+ "hi there", "hi", "hi sue bob",
+ "hi sue", "", "bob hi"};
- static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
+ static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
- static final String[] COUNTS_ARRAY = new String[] {
- "hi: 5", "there: 1", "sue: 2", "bob: 2"};
+ static final String[] COUNTS_ARRAY = new String[] {
+ "hi: 5", "there: 1", "sue: 2", "bob: 2"};
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(COUNTS_ARRAY), resultPath);
- }
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(COUNTS_ARRAY), resultPath);
+ }
- @Override
- protected void testProgram() throws Exception {
+ @Override
+ protected void testProgram() throws Exception {
- Pipeline p = FlinkTestPipeline.createForBatch();
+ Pipeline p = FlinkTestPipeline.createForBatch();
- PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
+ PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
- input
- .apply(new WordCount.CountWords())
- .apply(MapElements.via(new WordCount.FormatAsTextFn()))
- .apply(TextIO.Write.to(resultPath));
+ input
+ .apply(new WordCount.CountWords())
+ .apply(MapElements.via(new WordCount.FormatAsTextFn()))
+ .apply(TextIO.Write.to(resultPath));
- p.run();
- }
+ p.run();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
index ccc52c4..1cac036 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
@@ -33,104 +33,104 @@ import org.apache.flink.test.util.JavaProgramTestBase;
public class WordCountJoin2ITCase extends JavaProgramTestBase {
- static final String[] WORDS_1 = new String[] {
- "hi there", "hi", "hi sue bob",
- "hi sue", "", "bob hi"};
-
- static final String[] WORDS_2 = new String[] {
- "hi tim", "beauty", "hooray sue bob",
- "hi there", "", "please say hi"};
-
- static final String[] RESULTS = new String[] {
- "beauty -> Tag1: Tag2: 1",
- "bob -> Tag1: 2 Tag2: 1",
- "hi -> Tag1: 5 Tag2: 3",
- "hooray -> Tag1: Tag2: 1",
- "please -> Tag1: Tag2: 1",
- "say -> Tag1: Tag2: 1",
- "sue -> Tag1: 2 Tag2: 1",
- "there -> Tag1: 1 Tag2: 1",
- "tim -> Tag1: Tag2: 1"
- };
-
- static final TupleTag<Long> tag1 = new TupleTag<>("Tag1");
- static final TupleTag<Long> tag2 = new TupleTag<>("Tag2");
-
- protected String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- /* Create two PCollections and join them */
- PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1))
- .apply(ParDo.of(new ExtractWordsFn()))
- .apply(Count.<String>perElement());
-
- PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2))
- .apply(ParDo.of(new ExtractWordsFn()))
- .apply(Count.<String>perElement());
-
- /* CoGroup the two collections */
- PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple
- .of(tag1, occurences1)
- .and(tag2, occurences2)
- .apply(CoGroupByKey.<String>create());
-
- /* Format output */
- mergedOccurences.apply(ParDo.of(new FormatCountsFn()))
- .apply(TextIO.Write.named("test").to(resultPath));
-
- p.run();
- }
-
-
- static class ExtractWordsFn extends DoFn<String, String> {
-
- @Override
- public void startBundle(Context c) {
- }
-
- @Override
- public void processElement(ProcessContext c) {
- // Split the line into words.
- String[] words = c.element().split("[^a-zA-Z']+");
-
- // Output each word encountered into the output PCollection.
- for (String word : words) {
- if (!word.isEmpty()) {
- c.output(word);
- }
- }
- }
- }
-
- static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> {
- @Override
- public void processElement(ProcessContext c) {
- CoGbkResult value = c.element().getValue();
- String key = c.element().getKey();
- String countTag1 = tag1.getId() + ": ";
- String countTag2 = tag2.getId() + ": ";
- for (Long count : value.getAll(tag1)) {
- countTag1 += count + " ";
- }
- for (Long count : value.getAll(tag2)) {
- countTag2 += count;
- }
- c.output(key + " -> " + countTag1 + countTag2);
- }
- }
+ static final String[] WORDS_1 = new String[] {
+ "hi there", "hi", "hi sue bob",
+ "hi sue", "", "bob hi"};
+
+ static final String[] WORDS_2 = new String[] {
+ "hi tim", "beauty", "hooray sue bob",
+ "hi there", "", "please say hi"};
+
+ static final String[] RESULTS = new String[] {
+ "beauty -> Tag1: Tag2: 1",
+ "bob -> Tag1: 2 Tag2: 1",
+ "hi -> Tag1: 5 Tag2: 3",
+ "hooray -> Tag1: Tag2: 1",
+ "please -> Tag1: Tag2: 1",
+ "say -> Tag1: Tag2: 1",
+ "sue -> Tag1: 2 Tag2: 1",
+ "there -> Tag1: 1 Tag2: 1",
+ "tim -> Tag1: Tag2: 1"
+ };
+
+ static final TupleTag<Long> tag1 = new TupleTag<>("Tag1");
+ static final TupleTag<Long> tag2 = new TupleTag<>("Tag2");
+
+ protected String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ Pipeline p = FlinkTestPipeline.createForBatch();
+
+ /* Create two PCollections and join them */
+ PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1))
+ .apply(ParDo.of(new ExtractWordsFn()))
+ .apply(Count.<String>perElement());
+
+ PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2))
+ .apply(ParDo.of(new ExtractWordsFn()))
+ .apply(Count.<String>perElement());
+
+ /* CoGroup the two collections */
+ PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple
+ .of(tag1, occurences1)
+ .and(tag2, occurences2)
+ .apply(CoGroupByKey.<String>create());
+
+ /* Format output */
+ mergedOccurences.apply(ParDo.of(new FormatCountsFn()))
+ .apply(TextIO.Write.named("test").to(resultPath));
+
+ p.run();
+ }
+
+
+ static class ExtractWordsFn extends DoFn<String, String> {
+
+ @Override
+ public void startBundle(Context c) {
+ }
+
+ @Override
+ public void processElement(ProcessContext c) {
+ // Split the line into words.
+ String[] words = c.element().split("[^a-zA-Z']+");
+
+ // Output each word encountered into the output PCollection.
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ c.output(word);
+ }
+ }
+ }
+ }
+
+ static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> {
+ @Override
+ public void processElement(ProcessContext c) {
+ CoGbkResult value = c.element().getValue();
+ String key = c.element().getKey();
+ String countTag1 = tag1.getId() + ": ";
+ String countTag2 = tag2.getId() + ": ";
+ for (Long count : value.getAll(tag1)) {
+ countTag1 += count + " ";
+ }
+ for (Long count : value.getAll(tag2)) {
+ countTag2 += count;
+ }
+ c.output(key + " -> " + countTag1 + countTag2);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
index e6eddc0..4c8b99b 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
@@ -33,122 +33,122 @@ import org.apache.flink.test.util.JavaProgramTestBase;
public class WordCountJoin3ITCase extends JavaProgramTestBase {
- static final String[] WORDS_1 = new String[] {
- "hi there", "hi", "hi sue bob",
- "hi sue", "", "bob hi"};
-
- static final String[] WORDS_2 = new String[] {
- "hi tim", "beauty", "hooray sue bob",
- "hi there", "", "please say hi"};
-
- static final String[] WORDS_3 = new String[] {
- "hi stephan", "beauty", "hooray big fabian",
- "hi yo", "", "please say hi"};
-
- static final String[] RESULTS = new String[] {
- "beauty -> Tag1: Tag2: 1 Tag3: 1",
- "bob -> Tag1: 2 Tag2: 1 Tag3: ",
- "hi -> Tag1: 5 Tag2: 3 Tag3: 3",
- "hooray -> Tag1: Tag2: 1 Tag3: 1",
- "please -> Tag1: Tag2: 1 Tag3: 1",
- "say -> Tag1: Tag2: 1 Tag3: 1",
- "sue -> Tag1: 2 Tag2: 1 Tag3: ",
- "there -> Tag1: 1 Tag2: 1 Tag3: ",
- "tim -> Tag1: Tag2: 1 Tag3: ",
- "stephan -> Tag1: Tag2: Tag3: 1",
- "yo -> Tag1: Tag2: Tag3: 1",
- "fabian -> Tag1: Tag2: Tag3: 1",
- "big -> Tag1: Tag2: Tag3: 1"
- };
-
- static final TupleTag<Long> tag1 = new TupleTag<>("Tag1");
- static final TupleTag<Long> tag2 = new TupleTag<>("Tag2");
- static final TupleTag<Long> tag3 = new TupleTag<>("Tag3");
-
- protected String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
-
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- /* Create two PCollections and join them */
- PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1))
- .apply(ParDo.of(new ExtractWordsFn()))
- .apply(Count.<String>perElement());
-
- PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2))
- .apply(ParDo.of(new ExtractWordsFn()))
- .apply(Count.<String>perElement());
-
- PCollection<KV<String,Long>> occurences3 = p.apply(Create.of(WORDS_3))
- .apply(ParDo.of(new ExtractWordsFn()))
- .apply(Count.<String>perElement());
-
- /* CoGroup the two collections */
- PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple
- .of(tag1, occurences1)
- .and(tag2, occurences2)
- .and(tag3, occurences3)
- .apply(CoGroupByKey.<String>create());
-
- /* Format output */
- mergedOccurences.apply(ParDo.of(new FormatCountsFn()))
- .apply(TextIO.Write.named("test").to(resultPath));
-
- p.run();
- }
-
-
- static class ExtractWordsFn extends DoFn<String, String> {
-
- @Override
- public void startBundle(Context c) {
- }
-
- @Override
- public void processElement(ProcessContext c) {
- // Split the line into words.
- String[] words = c.element().split("[^a-zA-Z']+");
-
- // Output each word encountered into the output PCollection.
- for (String word : words) {
- if (!word.isEmpty()) {
- c.output(word);
- }
- }
- }
- }
-
- static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> {
- @Override
- public void processElement(ProcessContext c) {
- CoGbkResult value = c.element().getValue();
- String key = c.element().getKey();
- String countTag1 = tag1.getId() + ": ";
- String countTag2 = tag2.getId() + ": ";
- String countTag3 = tag3.getId() + ": ";
- for (Long count : value.getAll(tag1)) {
- countTag1 += count + " ";
- }
- for (Long count : value.getAll(tag2)) {
- countTag2 += count + " ";
- }
- for (Long count : value.getAll(tag3)) {
- countTag3 += count;
- }
- c.output(key + " -> " + countTag1 + countTag2 + countTag3);
- }
- }
+ static final String[] WORDS_1 = new String[] {
+ "hi there", "hi", "hi sue bob",
+ "hi sue", "", "bob hi"};
+
+ static final String[] WORDS_2 = new String[] {
+ "hi tim", "beauty", "hooray sue bob",
+ "hi there", "", "please say hi"};
+
+ static final String[] WORDS_3 = new String[] {
+ "hi stephan", "beauty", "hooray big fabian",
+ "hi yo", "", "please say hi"};
+
+ static final String[] RESULTS = new String[] {
+ "beauty -> Tag1: Tag2: 1 Tag3: 1",
+ "bob -> Tag1: 2 Tag2: 1 Tag3: ",
+ "hi -> Tag1: 5 Tag2: 3 Tag3: 3",
+ "hooray -> Tag1: Tag2: 1 Tag3: 1",
+ "please -> Tag1: Tag2: 1 Tag3: 1",
+ "say -> Tag1: Tag2: 1 Tag3: 1",
+ "sue -> Tag1: 2 Tag2: 1 Tag3: ",
+ "there -> Tag1: 1 Tag2: 1 Tag3: ",
+ "tim -> Tag1: Tag2: 1 Tag3: ",
+ "stephan -> Tag1: Tag2: Tag3: 1",
+ "yo -> Tag1: Tag2: Tag3: 1",
+ "fabian -> Tag1: Tag2: Tag3: 1",
+ "big -> Tag1: Tag2: Tag3: 1"
+ };
+
+ static final TupleTag<Long> tag1 = new TupleTag<>("Tag1");
+ static final TupleTag<Long> tag2 = new TupleTag<>("Tag2");
+ static final TupleTag<Long> tag3 = new TupleTag<>("Tag3");
+
+ protected String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+
+ Pipeline p = FlinkTestPipeline.createForBatch();
+
+ /* Create two PCollections and join them */
+ PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1))
+ .apply(ParDo.of(new ExtractWordsFn()))
+ .apply(Count.<String>perElement());
+
+ PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2))
+ .apply(ParDo.of(new ExtractWordsFn()))
+ .apply(Count.<String>perElement());
+
+ PCollection<KV<String,Long>> occurences3 = p.apply(Create.of(WORDS_3))
+ .apply(ParDo.of(new ExtractWordsFn()))
+ .apply(Count.<String>perElement());
+
+ /* CoGroup the two collections */
+ PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple
+ .of(tag1, occurences1)
+ .and(tag2, occurences2)
+ .and(tag3, occurences3)
+ .apply(CoGroupByKey.<String>create());
+
+ /* Format output */
+ mergedOccurences.apply(ParDo.of(new FormatCountsFn()))
+ .apply(TextIO.Write.named("test").to(resultPath));
+
+ p.run();
+ }
+
+
+ static class ExtractWordsFn extends DoFn<String, String> {
+
+ @Override
+ public void startBundle(Context c) {
+ }
+
+ @Override
+ public void processElement(ProcessContext c) {
+ // Split the line into words.
+ String[] words = c.element().split("[^a-zA-Z']+");
+
+ // Output each word encountered into the output PCollection.
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ c.output(word);
+ }
+ }
+ }
+ }
+
+ static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> {
+ @Override
+ public void processElement(ProcessContext c) {
+ CoGbkResult value = c.element().getValue();
+ String key = c.element().getKey();
+ String countTag1 = tag1.getId() + ": ";
+ String countTag2 = tag2.getId() + ": ";
+ String countTag3 = tag3.getId() + ": ";
+ for (Long count : value.getAll(tag1)) {
+ countTag1 += count + " ";
+ }
+ for (Long count : value.getAll(tag2)) {
+ countTag2 += count + " ";
+ }
+ for (Long count : value.getAll(tag3)) {
+ countTag3 += count;
+ }
+ c.output(key + " -> " + countTag1 + countTag2 + countTag3);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
index 865fc5f..a61bf52 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
@@ -39,118 +39,118 @@ import static org.junit.Assert.*;
*/
public class WriteSinkITCase extends JavaProgramTestBase {
- protected String resultPath;
+ protected String resultPath;
- public WriteSinkITCase(){
- }
+ public WriteSinkITCase(){
+ }
- static final String[] EXPECTED_RESULT = new String[] {
- "Joe red 3", "Mary blue 4", "Max yellow 23"};
+ static final String[] EXPECTED_RESULT = new String[] {
+ "Joe red 3", "Mary blue 4", "Max yellow 23"};
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- runProgram(resultPath);
- }
-
- private static void runProgram(String resultPath) {
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- p.apply(Create.of(EXPECTED_RESULT)).setCoder(StringUtf8Coder.of())
- .apply("CustomSink", Write.to(new MyCustomSink(resultPath)));
-
- p.run();
- }
-
- /**
- * Simple custom sink which writes to a file.
- */
- private static class MyCustomSink extends Sink<String> {
-
- private final String resultPath;
-
- public MyCustomSink(String resultPath) {
- this.resultPath = resultPath;
- }
-
- @Override
- public void validate(PipelineOptions options) {
- assertNotNull(options);
- }
-
- @Override
- public WriteOperation<String, ?> createWriteOperation(PipelineOptions options) {
- return new MyWriteOperation();
- }
-
- private class MyWriteOperation extends WriteOperation<String, String> {
-
- @Override
- public Coder<String> getWriterResultCoder() {
- return StringUtf8Coder.of();
- }
-
- @Override
- public void initialize(PipelineOptions options) throws Exception {
-
- }
-
- @Override
- public void finalize(Iterable<String> writerResults, PipelineOptions options) throws Exception {
-
- }
-
- @Override
- public Writer<String, String> createWriter(PipelineOptions options) throws Exception {
- return new MyWriter();
- }
-
- @Override
- public Sink<String> getSink() {
- return MyCustomSink.this;
- }
-
- /**
- * Simple Writer which writes to a file.
- */
- private class MyWriter extends Writer<String, String> {
-
- private PrintWriter internalWriter;
-
- @Override
- public void open(String uId) throws Exception {
- Path path = new Path(resultPath + "/" + uId);
- FileSystem.get(new URI("file:///")).create(path, false);
- internalWriter = new PrintWriter(new File(path.toUri()));
- }
-
- @Override
- public void write(String value) throws Exception {
- internalWriter.println(value);
- }
-
- @Override
- public String close() throws Exception {
- internalWriter.close();
- return resultPath;
- }
-
- @Override
- public WriteOperation<String, String> getWriteOperation() {
- return MyWriteOperation.this;
- }
- }
- }
- }
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ runProgram(resultPath);
+ }
+
+ private static void runProgram(String resultPath) {
+ Pipeline p = FlinkTestPipeline.createForBatch();
+
+ p.apply(Create.of(EXPECTED_RESULT)).setCoder(StringUtf8Coder.of())
+ .apply("CustomSink", Write.to(new MyCustomSink(resultPath)));
+
+ p.run();
+ }
+
+ /**
+ * Simple custom sink which writes to a file.
+ */
+ private static class MyCustomSink extends Sink<String> {
+
+ private final String resultPath;
+
+ public MyCustomSink(String resultPath) {
+ this.resultPath = resultPath;
+ }
+
+ @Override
+ public void validate(PipelineOptions options) {
+ assertNotNull(options);
+ }
+
+ @Override
+ public WriteOperation<String, ?> createWriteOperation(PipelineOptions options) {
+ return new MyWriteOperation();
+ }
+
+ private class MyWriteOperation extends WriteOperation<String, String> {
+
+ @Override
+ public Coder<String> getWriterResultCoder() {
+ return StringUtf8Coder.of();
+ }
+
+ @Override
+ public void initialize(PipelineOptions options) throws Exception {
+
+ }
+
+ @Override
+ public void finalize(Iterable<String> writerResults, PipelineOptions options) throws Exception {
+
+ }
+
+ @Override
+ public Writer<String, String> createWriter(PipelineOptions options) throws Exception {
+ return new MyWriter();
+ }
+
+ @Override
+ public Sink<String> getSink() {
+ return MyCustomSink.this;
+ }
+
+ /**
+ * Simple Writer which writes to a file.
+ */
+ private class MyWriter extends Writer<String, String> {
+
+ private PrintWriter internalWriter;
+
+ @Override
+ public void open(String uId) throws Exception {
+ Path path = new Path(resultPath + "/" + uId);
+ FileSystem.get(new URI("file:///")).create(path, false);
+ internalWriter = new PrintWriter(new File(path.toUri()));
+ }
+
+ @Override
+ public void write(String value) throws Exception {
+ internalWriter.println(value);
+ }
+
+ @Override
+ public String close() throws Exception {
+ internalWriter.close();
+ return resultPath;
+ }
+
+ @Override
+ public WriteOperation<String, String> getWriteOperation() {
+ return MyWriteOperation.this;
+ }
+ }
+ }
+ }
}