You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:36 UTC

[41/50] [abbrv] incubator-beam git commit: [flink] convert tabs to 2 spaces

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
index 1525c80..9f602fd 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
@@ -25,103 +25,103 @@ import java.util.concurrent.TimeUnit;
 
 public class StateCheckpointWriter {
 
-	private final AbstractStateBackend.CheckpointStateOutputView output;
-
-	public static StateCheckpointWriter create(AbstractStateBackend.CheckpointStateOutputView output) {
-		return new StateCheckpointWriter(output);
-	}
-
-	private StateCheckpointWriter(AbstractStateBackend.CheckpointStateOutputView output) {
-		this.output = output;
-	}
-
-	/////////			Creating the serialized versions of the different types of state held by dataflow			///////
-
-	public StateCheckpointWriter addValueBuilder() throws IOException {
-		validate();
-		StateType.serialize(StateType.VALUE, this);
-		return this;
-	}
-
-	public StateCheckpointWriter addWatermarkHoldsBuilder() throws IOException {
-		validate();
-		StateType.serialize(StateType.WATERMARK, this);
-		return this;
-	}
-
-	public StateCheckpointWriter addListUpdatesBuilder() throws IOException {
-		validate();
-		StateType.serialize(StateType.LIST, this);
-		return this;
-	}
-
-	public StateCheckpointWriter addAccumulatorBuilder() throws IOException {
-		validate();
-		StateType.serialize(StateType.ACCUMULATOR, this);
-		return this;
-	}
-
-	/////////			Setting the tag for a given state element			///////
-
-	public StateCheckpointWriter setTag(ByteString stateKey) throws IOException {
-		return writeData(stateKey.toByteArray());
-	}
-
-	public StateCheckpointWriter setTag(String stateKey) throws IOException {
-		output.writeUTF(stateKey);
-		return this;
-	}
-
-
-	public <K> StateCheckpointWriter serializeKey(K key, CoderTypeSerializer<K> keySerializer) throws IOException {
-		return serializeObject(key, keySerializer);
-	}
-
-	public <T> StateCheckpointWriter serializeObject(T object, CoderTypeSerializer<T> objectSerializer) throws IOException {
-		objectSerializer.serialize(object, output);
-		return this;
-	}
-
-	/////////			Write the actual serialized data			//////////
-
-	public StateCheckpointWriter setData(ByteString data) throws IOException {
-		return writeData(data.toByteArray());
-	}
-
-	public StateCheckpointWriter setData(byte[] data) throws IOException {
-		return writeData(data);
-	}
-
-	public StateCheckpointWriter setTimestamp(Instant timestamp) throws IOException {
-		validate();
-		output.writeLong(TimeUnit.MILLISECONDS.toMicros(timestamp.getMillis()));
-		return this;
-	}
-
-	public StateCheckpointWriter writeInt(int number) throws IOException {
-		validate();
-		output.writeInt(number);
-		return this;
-	}
-
-	public StateCheckpointWriter writeByte(byte b) throws IOException {
-		validate();
-		output.writeByte(b);
-		return this;
-	}
-
-	/////////			Helper Methods			///////
-
-	private StateCheckpointWriter writeData(byte[] data) throws IOException {
-		validate();
-		output.writeInt(data.length);
-		output.write(data);
-		return this;
-	}
-
-	private void validate() {
-		if (this.output == null) {
-			throw new RuntimeException("StateBackend not initialized yet.");
-		}
-	}
+  private final AbstractStateBackend.CheckpointStateOutputView output;
+
+  public static StateCheckpointWriter create(AbstractStateBackend.CheckpointStateOutputView output) {
+    return new StateCheckpointWriter(output);
+  }
+
+  private StateCheckpointWriter(AbstractStateBackend.CheckpointStateOutputView output) {
+    this.output = output;
+  }
+
+  /////////      Creating the serialized versions of the different types of state held by dataflow      ///////
+
+  public StateCheckpointWriter addValueBuilder() throws IOException {
+    validate();
+    StateType.serialize(StateType.VALUE, this);
+    return this;
+  }
+
+  public StateCheckpointWriter addWatermarkHoldsBuilder() throws IOException {
+    validate();
+    StateType.serialize(StateType.WATERMARK, this);
+    return this;
+  }
+
+  public StateCheckpointWriter addListUpdatesBuilder() throws IOException {
+    validate();
+    StateType.serialize(StateType.LIST, this);
+    return this;
+  }
+
+  public StateCheckpointWriter addAccumulatorBuilder() throws IOException {
+    validate();
+    StateType.serialize(StateType.ACCUMULATOR, this);
+    return this;
+  }
+
+  /////////      Setting the tag for a given state element      ///////
+
+  public StateCheckpointWriter setTag(ByteString stateKey) throws IOException {
+    return writeData(stateKey.toByteArray());
+  }
+
+  public StateCheckpointWriter setTag(String stateKey) throws IOException {
+    output.writeUTF(stateKey);
+    return this;
+  }
+
+
+  public <K> StateCheckpointWriter serializeKey(K key, CoderTypeSerializer<K> keySerializer) throws IOException {
+    return serializeObject(key, keySerializer);
+  }
+
+  public <T> StateCheckpointWriter serializeObject(T object, CoderTypeSerializer<T> objectSerializer) throws IOException {
+    objectSerializer.serialize(object, output);
+    return this;
+  }
+
+  /////////      Write the actual serialized data      //////////
+
+  public StateCheckpointWriter setData(ByteString data) throws IOException {
+    return writeData(data.toByteArray());
+  }
+
+  public StateCheckpointWriter setData(byte[] data) throws IOException {
+    return writeData(data);
+  }
+
+  public StateCheckpointWriter setTimestamp(Instant timestamp) throws IOException {
+    validate();
+    output.writeLong(TimeUnit.MILLISECONDS.toMicros(timestamp.getMillis()));
+    return this;
+  }
+
+  public StateCheckpointWriter writeInt(int number) throws IOException {
+    validate();
+    output.writeInt(number);
+    return this;
+  }
+
+  public StateCheckpointWriter writeByte(byte b) throws IOException {
+    validate();
+    output.writeByte(b);
+    return this;
+  }
+
+  /////////      Helper Methods      ///////
+
+  private StateCheckpointWriter writeData(byte[] data) throws IOException {
+    validate();
+    output.writeInt(data.length);
+    output.write(data);
+    return this;
+  }
+
+  private void validate() {
+    if (this.output == null) {
+      throw new RuntimeException("StateBackend not initialized yet.");
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
index aa049ef..9e2c9f8 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
@@ -23,49 +23,49 @@ import java.io.IOException;
  * */
 public enum StateType {
 
-	VALUE(0),
+  VALUE(0),
 
-	WATERMARK(1),
+  WATERMARK(1),
 
-	LIST(2),
+  LIST(2),
 
-	ACCUMULATOR(3);
+  ACCUMULATOR(3);
 
-	private final int numVal;
+  private final int numVal;
 
-	StateType(int value) {
-		this.numVal = value;
-	}
+  StateType(int value) {
+    this.numVal = value;
+  }
 
-	public static void serialize(StateType type, StateCheckpointWriter output) throws IOException {
-		if (output == null) {
-			throw new IllegalArgumentException("Cannot write to a null output.");
-		}
+  public static void serialize(StateType type, StateCheckpointWriter output) throws IOException {
+    if (output == null) {
+      throw new IllegalArgumentException("Cannot write to a null output.");
+    }
 
-		if(type.numVal < 0 || type.numVal > 3) {
-			throw new RuntimeException("Unknown State Type " + type + ".");
-		}
+    if(type.numVal < 0 || type.numVal > 3) {
+      throw new RuntimeException("Unknown State Type " + type + ".");
+    }
 
-		output.writeByte((byte) type.numVal);
-	}
+    output.writeByte((byte) type.numVal);
+  }
 
-	public static StateType deserialize(StateCheckpointReader input) throws IOException {
-		if (input == null) {
-			throw new IllegalArgumentException("Cannot read from a null input.");
-		}
+  public static StateType deserialize(StateCheckpointReader input) throws IOException {
+    if (input == null) {
+      throw new IllegalArgumentException("Cannot read from a null input.");
+    }
 
-		int typeInt = (int) input.getByte();
-		if(typeInt < 0 || typeInt > 3) {
-			throw new RuntimeException("Unknown State Type " + typeInt + ".");
-		}
+    int typeInt = (int) input.getByte();
+    if(typeInt < 0 || typeInt > 3) {
+      throw new RuntimeException("Unknown State Type " + typeInt + ".");
+    }
 
-		StateType resultType = null;
-		for(StateType st: values()) {
-			if(st.numVal == typeInt) {
-				resultType = st;
-				break;
-			}
-		}
-		return resultType;
-	}
+    StateType resultType = null;
+    for(StateType st: values()) {
+      if(st.numVal == typeInt) {
+        resultType = st;
+        break;
+      }
+    }
+    return resultType;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
index ce53d44..3272975 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
@@ -30,70 +30,70 @@ import org.apache.flink.test.util.JavaProgramTestBase;
 
 public class AvroITCase extends JavaProgramTestBase {
 
-	protected String resultPath;
-	protected String tmpPath;
-
-	public AvroITCase(){
-	}
-
-	static final String[] EXPECTED_RESULT = new String[] {
-			"Joe red 3",
-			"Mary blue 4",
-			"Mark green 1",
-			"Julia purple 5"
-	};
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-		tmpPath = getTempDirPath("tmp");
-
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		runProgram(tmpPath, resultPath);
-	}
-
-	private static void runProgram(String tmpPath, String resultPath) {
-		Pipeline p = FlinkTestPipeline.createForBatch();
-
-		p
-			.apply(Create.of(
-					new User("Joe", 3, "red"),
-					new User("Mary", 4, "blue"),
-					new User("Mark", 1, "green"),
-					new User("Julia", 5, "purple"))
-				.withCoder(AvroCoder.of(User.class)))
-
-			.apply(AvroIO.Write.to(tmpPath)
-				.withSchema(User.class));
-
-		p.run();
-
-		p = FlinkTestPipeline.createForBatch();
-
-		p
-			.apply(AvroIO.Read.from(tmpPath).withSchema(User.class).withoutValidation())
-
-				.apply(ParDo.of(new DoFn<User, String>() {
-					@Override
-					public void processElement(ProcessContext c) throws Exception {
-						User u = c.element();
-						String result = u.getName() + " " + u.getFavoriteColor() + " " + u.getFavoriteNumber();
-						c.output(result);
-					}
-				}))
-
-			.apply(TextIO.Write.to(resultPath));
-
-		p.run();
-	}
+  protected String resultPath;
+  protected String tmpPath;
+
+  public AvroITCase(){
+  }
+
+  static final String[] EXPECTED_RESULT = new String[] {
+      "Joe red 3",
+      "Mary blue 4",
+      "Mark green 1",
+      "Julia purple 5"
+  };
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+    tmpPath = getTempDirPath("tmp");
+
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+    runProgram(tmpPath, resultPath);
+  }
+
+  private static void runProgram(String tmpPath, String resultPath) {
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    p
+      .apply(Create.of(
+          new User("Joe", 3, "red"),
+          new User("Mary", 4, "blue"),
+          new User("Mark", 1, "green"),
+          new User("Julia", 5, "purple"))
+        .withCoder(AvroCoder.of(User.class)))
+
+      .apply(AvroIO.Write.to(tmpPath)
+        .withSchema(User.class));
+
+    p.run();
+
+    p = FlinkTestPipeline.createForBatch();
+
+    p
+      .apply(AvroIO.Read.from(tmpPath).withSchema(User.class).withoutValidation())
+
+        .apply(ParDo.of(new DoFn<User, String>() {
+          @Override
+          public void processElement(ProcessContext c) throws Exception {
+            User u = c.element();
+            String result = u.getName() + " " + u.getFavoriteColor() + " " + u.getFavoriteNumber();
+            c.output(result);
+          }
+        }))
+
+      .apply(TextIO.Write.to(resultPath));
+
+    p.run();
+  }
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
index 928388c..e65e497 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
@@ -26,47 +26,47 @@ import org.apache.flink.test.util.JavaProgramTestBase;
 
 public class FlattenizeITCase extends JavaProgramTestBase {
 
-	private String resultPath;
-	private String resultPath2;
+  private String resultPath;
+  private String resultPath2;
 
-	private static final String[] words = {"hello", "this", "is", "a", "DataSet!"};
-	private static final String[] words2 = {"hello", "this", "is", "another", "DataSet!"};
-	private static final String[] words3 = {"hello", "this", "is", "yet", "another", "DataSet!"};
+  private static final String[] words = {"hello", "this", "is", "a", "DataSet!"};
+  private static final String[] words2 = {"hello", "this", "is", "another", "DataSet!"};
+  private static final String[] words3 = {"hello", "this", "is", "yet", "another", "DataSet!"};
 
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-		resultPath2 = getTempDirPath("result2");
-	}
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+    resultPath2 = getTempDirPath("result2");
+  }
 
-	@Override
-	protected void postSubmit() throws Exception {
-		String join = Joiner.on('\n').join(words);
-		String join2 = Joiner.on('\n').join(words2);
-		String join3 = Joiner.on('\n').join(words3);
-		compareResultsByLinesInMemory(join + "\n" + join2, resultPath);
-		compareResultsByLinesInMemory(join + "\n" + join2 + "\n" + join3, resultPath2);
-	}
+  @Override
+  protected void postSubmit() throws Exception {
+    String join = Joiner.on('\n').join(words);
+    String join2 = Joiner.on('\n').join(words2);
+    String join3 = Joiner.on('\n').join(words3);
+    compareResultsByLinesInMemory(join + "\n" + join2, resultPath);
+    compareResultsByLinesInMemory(join + "\n" + join2 + "\n" + join3, resultPath2);
+  }
 
 
-	@Override
-	protected void testProgram() throws Exception {
-		Pipeline p = FlinkTestPipeline.createForBatch();
+  @Override
+  protected void testProgram() throws Exception {
+    Pipeline p = FlinkTestPipeline.createForBatch();
 
-		PCollection<String> p1 = p.apply(Create.of(words));
-		PCollection<String> p2 = p.apply(Create.of(words2));
+    PCollection<String> p1 = p.apply(Create.of(words));
+    PCollection<String> p2 = p.apply(Create.of(words2));
 
-		PCollectionList<String> list = PCollectionList.of(p1).and(p2);
+    PCollectionList<String> list = PCollectionList.of(p1).and(p2);
 
-		list.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath));
+    list.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath));
 
-		PCollection<String> p3 = p.apply(Create.of(words3));
+    PCollection<String> p3 = p.apply(Create.of(words3));
 
-		PCollectionList<String> list2 = list.and(p3);
+    PCollectionList<String> list2 = list.and(p3);
 
-		list2.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath2));
+    list2.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath2));
 
-		p.run();
-	}
+    p.run();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
index 59c3b69..578e0e1 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
@@ -26,45 +26,45 @@ import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
  */
 public class FlinkTestPipeline extends Pipeline {
 
-	/**
-	 * Creates and returns a new test pipeline for batch execution.
-	 *
-	 * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
-	 * {@link Pipeline#run} to execute the pipeline and check the tests.
-	 */
-	public static FlinkTestPipeline createForBatch() {
-		return create(false);
-	}
+  /**
+   * Creates and returns a new test pipeline for batch execution.
+   *
+   * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
+   * {@link Pipeline#run} to execute the pipeline and check the tests.
+   */
+  public static FlinkTestPipeline createForBatch() {
+    return create(false);
+  }
 
-	/**
-	 * Creates and returns a new test pipeline for streaming execution.
-	 *
-	 * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
-	 * {@link Pipeline#run} to execute the pipeline and check the tests.
-	 *
-	 * @return The Test Pipeline
-	 */
-	public static FlinkTestPipeline createForStreaming() {
-		return create(true);
-	}
+  /**
+   * Creates and returns a new test pipeline for streaming execution.
+   *
+   * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
+   * {@link Pipeline#run} to execute the pipeline and check the tests.
+   *
+   * @return The Test Pipeline
+   */
+  public static FlinkTestPipeline createForStreaming() {
+    return create(true);
+  }
 
-	/**
-	 * Creates and returns a new test pipeline for streaming or batch execution.
-	 *
-	 * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
-	 * {@link Pipeline#run} to execute the pipeline and check the tests.
-	 *
-	 * @param streaming <code>True</code> for streaming mode, <code>False</code> for batch.
-	 * @return The Test Pipeline.
-	 */
-	private static FlinkTestPipeline create(boolean streaming) {
-		FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest(streaming);
-		return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions());
-	}
+  /**
+   * Creates and returns a new test pipeline for streaming or batch execution.
+   *
+   * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
+   * {@link Pipeline#run} to execute the pipeline and check the tests.
+   *
+   * @param streaming <code>True</code> for streaming mode, <code>False</code> for batch.
+   * @return The Test Pipeline.
+   */
+  private static FlinkTestPipeline create(boolean streaming) {
+    FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest(streaming);
+    return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions());
+  }
 
-	private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner,
-							PipelineOptions options) {
-		super(runner, options);
-	}
+  private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner,
+              PipelineOptions options) {
+    super(runner, options);
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
index af0f217..28861ea 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
@@ -34,66 +34,66 @@ import java.util.List;
  */
 public class JoinExamplesITCase extends JavaProgramTestBase {
 
-	protected String resultPath;
-
-	public JoinExamplesITCase(){
-	}
-
-	private static final TableRow row1 = new TableRow()
-			.set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212")
-			.set("Actor1Name", "BANGKOK").set("SOURCEURL", "http://cnn.com");
-	private static final TableRow row2 = new TableRow()
-			.set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212")
-			.set("Actor1Name", "LAOS").set("SOURCEURL", "http://www.chicagotribune.com");
-	private static final TableRow row3 = new TableRow()
-			.set("ActionGeo_CountryCode", "BE").set("SQLDATE", "20141213")
-			.set("Actor1Name", "AFGHANISTAN").set("SOURCEURL", "http://cnn.com");
-	static final TableRow[] EVENTS = new TableRow[] {
-			row1, row2, row3
-	};
-	static final List<TableRow> EVENT_ARRAY = Arrays.asList(EVENTS);
-
-	private static final TableRow cc1 = new TableRow()
-			.set("FIPSCC", "VM").set("HumanName", "Vietnam");
-	private static final TableRow cc2 = new TableRow()
-			.set("FIPSCC", "BE").set("HumanName", "Belgium");
-	static final TableRow[] CCS = new TableRow[] {
-			cc1, cc2
-	};
-	static final List<TableRow> CC_ARRAY = Arrays.asList(CCS);
-
-	static final String[] JOINED_EVENTS = new String[] {
-			"Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: LAOS, "
-					+ "url: http://www.chicagotribune.com",
-			"Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: BANGKOK, "
-					+ "url: http://cnn.com",
-			"Country code: BE, Country name: Belgium, Event info: Date: 20141213, Actor1: AFGHANISTAN, "
-					+ "url: http://cnn.com"
-	};
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(Joiner.on('\n').join(JOINED_EVENTS), resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-
-		Pipeline p = FlinkTestPipeline.createForBatch();
-
-		PCollection<TableRow> input1 = p.apply(Create.of(EVENT_ARRAY));
-		PCollection<TableRow> input2 = p.apply(Create.of(CC_ARRAY));
-
-		PCollection<String> output = JoinExamples.joinEvents(input1, input2);
-
-		output.apply(TextIO.Write.to(resultPath));
-
-		p.run();
-	}
+  protected String resultPath;
+
+  public JoinExamplesITCase(){
+  }
+
+  private static final TableRow row1 = new TableRow()
+      .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212")
+      .set("Actor1Name", "BANGKOK").set("SOURCEURL", "http://cnn.com");
+  private static final TableRow row2 = new TableRow()
+      .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212")
+      .set("Actor1Name", "LAOS").set("SOURCEURL", "http://www.chicagotribune.com");
+  private static final TableRow row3 = new TableRow()
+      .set("ActionGeo_CountryCode", "BE").set("SQLDATE", "20141213")
+      .set("Actor1Name", "AFGHANISTAN").set("SOURCEURL", "http://cnn.com");
+  static final TableRow[] EVENTS = new TableRow[] {
+      row1, row2, row3
+  };
+  static final List<TableRow> EVENT_ARRAY = Arrays.asList(EVENTS);
+
+  private static final TableRow cc1 = new TableRow()
+      .set("FIPSCC", "VM").set("HumanName", "Vietnam");
+  private static final TableRow cc2 = new TableRow()
+      .set("FIPSCC", "BE").set("HumanName", "Belgium");
+  static final TableRow[] CCS = new TableRow[] {
+      cc1, cc2
+  };
+  static final List<TableRow> CC_ARRAY = Arrays.asList(CCS);
+
+  static final String[] JOINED_EVENTS = new String[] {
+      "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: LAOS, "
+          + "url: http://www.chicagotribune.com",
+      "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: BANGKOK, "
+          + "url: http://cnn.com",
+      "Country code: BE, Country name: Belgium, Event info: Date: 20141213, Actor1: AFGHANISTAN, "
+          + "url: http://cnn.com"
+  };
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(JOINED_EVENTS), resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    PCollection<TableRow> input1 = p.apply(Create.of(EVENT_ARRAY));
+    PCollection<TableRow> input2 = p.apply(Create.of(CC_ARRAY));
+
+    PCollection<String> output = JoinExamples.joinEvents(input1, input2);
+
+    output.apply(TextIO.Write.to(resultPath));
+
+    p.run();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
index 35f2eaf..d1652e7 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
@@ -27,37 +27,37 @@ import java.io.Serializable;
 
 public class MaybeEmptyTestITCase extends JavaProgramTestBase implements Serializable {
 
-	protected String resultPath;
-
-	protected final String expected = "test";
-
-	public MaybeEmptyTestITCase() {
-	}
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-
-		Pipeline p = FlinkTestPipeline.createForBatch();
-
-		p.apply(Create.of((Void) null)).setCoder(VoidCoder.of())
-				.apply(ParDo.of(
-						new DoFn<Void, String>() {
-							@Override
-							public void processElement(DoFn<Void, String>.ProcessContext c) {
-								c.output(expected);
-							}
-						})).apply(TextIO.Write.to(resultPath));
-		p.run();
-	}
+  protected String resultPath;
+
+  protected final String expected = "test";
+
+  public MaybeEmptyTestITCase() {
+  }
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(expected, resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    p.apply(Create.of((Void) null)).setCoder(VoidCoder.of())
+        .apply(ParDo.of(
+            new DoFn<Void, String>() {
+              @Override
+              public void processElement(DoFn<Void, String>.ProcessContext c) {
+                c.output(expected);
+              }
+            })).apply(TextIO.Write.to(resultPath));
+    p.run();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
index ccdbbf9..d8087d6 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
@@ -31,68 +31,68 @@ import java.io.Serializable;
 
 public class ParDoMultiOutputITCase extends JavaProgramTestBase implements Serializable {
 
-	private String resultPath;
-
-	private static String[] expectedWords = {"MAAA", "MAAFOOO"};
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(Joiner.on("\n").join(expectedWords), resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		Pipeline p = FlinkTestPipeline.createForBatch();
-
-		PCollection<String> words = p.apply(Create.of("Hello", "Whatupmyman", "hey", "SPECIALthere", "MAAA", "MAAFOOO"));
-
-		// Select words whose length is below a cut off,
-		// plus the lengths of words that are above the cut off.
-		// Also select words starting with "MARKER".
-		final int wordLengthCutOff = 3;
-		// Create tags to use for the main and side outputs.
-		final TupleTag<String> wordsBelowCutOffTag = new TupleTag<String>(){};
-		final TupleTag<Integer> wordLengthsAboveCutOffTag = new TupleTag<Integer>(){};
-		final TupleTag<String> markedWordsTag = new TupleTag<String>(){};
-
-		PCollectionTuple results =
-				words.apply(ParDo
-						.withOutputTags(wordsBelowCutOffTag, TupleTagList.of(wordLengthsAboveCutOffTag)
-								.and(markedWordsTag))
-						.of(new DoFn<String, String>() {
-							final TupleTag<String> specialWordsTag = new TupleTag<String>() {
-							};
-
-							public void processElement(ProcessContext c) {
-								String word = c.element();
-								if (word.length() <= wordLengthCutOff) {
-									c.output(word);
-								} else {
-									c.sideOutput(wordLengthsAboveCutOffTag, word.length());
-								}
-								if (word.startsWith("MAA")) {
-									c.sideOutput(markedWordsTag, word);
-								}
-
-								if (word.startsWith("SPECIAL")) {
-									c.sideOutput(specialWordsTag, word);
-								}
-							}
-						}));
-
-		// Extract the PCollection results, by tag.
-		PCollection<String> wordsBelowCutOff = results.get(wordsBelowCutOffTag);
-		PCollection<Integer> wordLengthsAboveCutOff = results.get
-				(wordLengthsAboveCutOffTag);
-		PCollection<String> markedWords = results.get(markedWordsTag);
-
-		markedWords.apply(TextIO.Write.to(resultPath));
-
-		p.run();
-	}
+  private String resultPath;
+
+  private static String[] expectedWords = {"MAAA", "MAAFOOO"};
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on("\n").join(expectedWords), resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    PCollection<String> words = p.apply(Create.of("Hello", "Whatupmyman", "hey", "SPECIALthere", "MAAA", "MAAFOOO"));
+
+    // Select words whose length is below a cut off,
+    // plus the lengths of words that are above the cut off.
+    // Also select words starting with "MARKER".
+    final int wordLengthCutOff = 3;
+    // Create tags to use for the main and side outputs.
+    final TupleTag<String> wordsBelowCutOffTag = new TupleTag<String>(){};
+    final TupleTag<Integer> wordLengthsAboveCutOffTag = new TupleTag<Integer>(){};
+    final TupleTag<String> markedWordsTag = new TupleTag<String>(){};
+
+    PCollectionTuple results =
+        words.apply(ParDo
+            .withOutputTags(wordsBelowCutOffTag, TupleTagList.of(wordLengthsAboveCutOffTag)
+                .and(markedWordsTag))
+            .of(new DoFn<String, String>() {
+              final TupleTag<String> specialWordsTag = new TupleTag<String>() {
+              };
+
+              public void processElement(ProcessContext c) {
+                String word = c.element();
+                if (word.length() <= wordLengthCutOff) {
+                  c.output(word);
+                } else {
+                  c.sideOutput(wordLengthsAboveCutOffTag, word.length());
+                }
+                if (word.startsWith("MAA")) {
+                  c.sideOutput(markedWordsTag, word);
+                }
+
+                if (word.startsWith("SPECIAL")) {
+                  c.sideOutput(specialWordsTag, word);
+                }
+              }
+            }));
+
+    // Extract the PCollection results, by tag.
+    PCollection<String> wordsBelowCutOff = results.get(wordsBelowCutOffTag);
+    PCollection<Integer> wordLengthsAboveCutOff = results.get
+        (wordLengthsAboveCutOffTag);
+    PCollection<String> markedWords = results.get(markedWordsTag);
+
+    markedWords.apply(TextIO.Write.to(resultPath));
+
+    p.run();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
index 3569a78..5a46359 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
@@ -36,128 +36,128 @@ import java.util.List;
 
 public class ReadSourceITCase extends JavaProgramTestBase {
 
-	protected String resultPath;
-
-	public ReadSourceITCase(){
-	}
-
-	static final String[] EXPECTED_RESULT = new String[] {
-			"1", "2", "3", "4", "5", "6", "7", "8", "9"};
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		runProgram(resultPath);
-	}
-
-	private static void runProgram(String resultPath) {
-
-		Pipeline p = FlinkTestPipeline.createForBatch();
-
-		PCollection<String> result = p
-				.apply(Read.from(new ReadSource(1, 10)))
-				.apply(ParDo.of(new DoFn<Integer, String>() {
-					@Override
-					public void processElement(ProcessContext c) throws Exception {
-						c.output(c.element().toString());
-					}
-				}));
-
-		result.apply(TextIO.Write.to(resultPath));
-		p.run();
-	}
-
-
-	private static class ReadSource extends BoundedSource<Integer> {
-		final int from;
-		final int to;
-
-		ReadSource(int from, int to) {
-			this.from = from;
-			this.to = to;
-		}
-
-		@Override
-		public List<ReadSource> splitIntoBundles(long desiredShardSizeBytes, PipelineOptions options)
-				throws Exception {
-			List<ReadSource> res = new ArrayList<>();
-			FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class);
-			int numWorkers = flinkOptions.getParallelism();
-			Preconditions.checkArgument(numWorkers > 0, "Number of workers should be larger than 0.");
-
-			float step = 1.0f * (to - from) / numWorkers;
-			for (int i = 0; i < numWorkers; ++i) {
-				res.add(new ReadSource(Math.round(from + i * step), Math.round(from + (i + 1) * step)));
-			}
-			return res;
-		}
-
-		@Override
-		public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-			return 8 * (to - from);
-		}
-
-		@Override
-		public boolean producesSortedKeys(PipelineOptions options) throws Exception {
-			return true;
-		}
-
-		@Override
-		public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException {
-			return new RangeReader(this);
-		}
-
-		@Override
-		public void validate() {}
-
-		@Override
-		public Coder<Integer> getDefaultOutputCoder() {
-			return BigEndianIntegerCoder.of();
-		}
-
-		private class RangeReader extends BoundedReader<Integer> {
-			private int current;
-
-			public RangeReader(ReadSource source) {
-				this.current = source.from - 1;
-			}
-
-			@Override
-			public boolean start() throws IOException {
-				return true;
-			}
-
-			@Override
-			public boolean advance() throws IOException {
-				current++;
-				return (current < to);
-			}
-
-			@Override
-			public Integer getCurrent() {
-				return current;
-			}
-
-			@Override
-			public void close() throws IOException {
-				// Nothing
-			}
-
-			@Override
-			public BoundedSource<Integer> getCurrentSource() {
-				return ReadSource.this;
-			}
-		}
-	}
+  protected String resultPath;
+
+  public ReadSourceITCase(){
+  }
+
+  static final String[] EXPECTED_RESULT = new String[] {
+      "1", "2", "3", "4", "5", "6", "7", "8", "9"};
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+    runProgram(resultPath);
+  }
+
+  private static void runProgram(String resultPath) {
+
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    PCollection<String> result = p
+        .apply(Read.from(new ReadSource(1, 10)))
+        .apply(ParDo.of(new DoFn<Integer, String>() {
+          @Override
+          public void processElement(ProcessContext c) throws Exception {
+            c.output(c.element().toString());
+          }
+        }));
+
+    result.apply(TextIO.Write.to(resultPath));
+    p.run();
+  }
+
+
+  private static class ReadSource extends BoundedSource<Integer> {
+    final int from;
+    final int to;
+
+    ReadSource(int from, int to) {
+      this.from = from;
+      this.to = to;
+    }
+
+    @Override
+    public List<ReadSource> splitIntoBundles(long desiredShardSizeBytes, PipelineOptions options)
+        throws Exception {
+      List<ReadSource> res = new ArrayList<>();
+      FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class);
+      int numWorkers = flinkOptions.getParallelism();
+      Preconditions.checkArgument(numWorkers > 0, "Number of workers should be larger than 0.");
+
+      float step = 1.0f * (to - from) / numWorkers;
+      for (int i = 0; i < numWorkers; ++i) {
+        res.add(new ReadSource(Math.round(from + i * step), Math.round(from + (i + 1) * step)));
+      }
+      return res;
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+      return 8 * (to - from);
+    }
+
+    @Override
+    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+      return true;
+    }
+
+    @Override
+    public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException {
+      return new RangeReader(this);
+    }
+
+    @Override
+    public void validate() {}
+
+    @Override
+    public Coder<Integer> getDefaultOutputCoder() {
+      return BigEndianIntegerCoder.of();
+    }
+
+    private class RangeReader extends BoundedReader<Integer> {
+      private int current;
+
+      public RangeReader(ReadSource source) {
+        this.current = source.from - 1;
+      }
+
+      @Override
+      public boolean start() throws IOException {
+        return true;
+      }
+
+      @Override
+      public boolean advance() throws IOException {
+        current++;
+        return (current < to);
+      }
+
+      @Override
+      public Integer getCurrent() {
+        return current;
+      }
+
+      @Override
+      public void close() throws IOException {
+        // Nothing
+      }
+
+      @Override
+      public BoundedSource<Integer> getCurrentSource() {
+        return ReadSource.this;
+      }
+    }
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
index db794f7..615f194 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
@@ -30,39 +30,39 @@ import java.util.List;
 
 public class RemoveDuplicatesEmptyITCase extends JavaProgramTestBase {
 
-	protected String resultPath;
+  protected String resultPath;
 
-	public RemoveDuplicatesEmptyITCase(){
-	}
+  public RemoveDuplicatesEmptyITCase(){
+  }
 
-	static final String[] EXPECTED_RESULT = new String[] {};
+  static final String[] EXPECTED_RESULT = new String[] {};
 
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
 
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
-	}
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+  }
 
-	@Override
-	protected void testProgram() throws Exception {
+  @Override
+  protected void testProgram() throws Exception {
 
-		List<String> strings = Collections.emptyList();
+    List<String> strings = Collections.emptyList();
 
-		Pipeline p = FlinkTestPipeline.createForBatch();
+    Pipeline p = FlinkTestPipeline.createForBatch();
 
-		PCollection<String> input =
-				p.apply(Create.of(strings))
-						.setCoder(StringUtf8Coder.of());
+    PCollection<String> input =
+        p.apply(Create.of(strings))
+            .setCoder(StringUtf8Coder.of());
 
-		PCollection<String> output =
-				input.apply(RemoveDuplicates.<String>create());
+    PCollection<String> output =
+        input.apply(RemoveDuplicates.<String>create());
 
-		output.apply(TextIO.Write.to(resultPath));
-		p.run();
-	}
+    output.apply(TextIO.Write.to(resultPath));
+    p.run();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
index 04e06b8..8c19f2c 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
@@ -30,40 +30,40 @@ import java.util.List;
 
 public class RemoveDuplicatesITCase extends JavaProgramTestBase {
 
-	protected String resultPath;
+  protected String resultPath;
 
-	public RemoveDuplicatesITCase(){
-	}
+  public RemoveDuplicatesITCase(){
+  }
 
-	static final String[] EXPECTED_RESULT = new String[] {
-			"k1", "k5", "k2", "k3"};
+  static final String[] EXPECTED_RESULT = new String[] {
+      "k1", "k5", "k2", "k3"};
 
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
 
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
-	}
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+  }
 
-	@Override
-	protected void testProgram() throws Exception {
+  @Override
+  protected void testProgram() throws Exception {
 
-		List<String> strings = Arrays.asList("k1", "k5", "k5", "k2", "k1", "k2", "k3");
+    List<String> strings = Arrays.asList("k1", "k5", "k5", "k2", "k1", "k2", "k3");
 
-		Pipeline p = FlinkTestPipeline.createForBatch();
+    Pipeline p = FlinkTestPipeline.createForBatch();
 
-		PCollection<String> input =
-				p.apply(Create.of(strings))
-						.setCoder(StringUtf8Coder.of());
+    PCollection<String> input =
+        p.apply(Create.of(strings))
+            .setCoder(StringUtf8Coder.of());
 
-		PCollection<String> output =
-				input.apply(RemoveDuplicates.<String>create());
+    PCollection<String> output =
+        input.apply(RemoveDuplicates.<String>create());
 
-		output.apply(TextIO.Write.to(resultPath));
-		p.run();
-	}
+    output.apply(TextIO.Write.to(resultPath));
+    p.run();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
index ee8843c..7c3d6f9 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
@@ -28,40 +28,40 @@ import java.io.Serializable;
 
 public class SideInputITCase extends JavaProgramTestBase implements Serializable {
 
-	private static final String expected = "Hello!";
+  private static final String expected = "Hello!";
 
-	protected String resultPath;
+  protected String resultPath;
 
-	@Override
-	protected void testProgram() throws Exception {
+  @Override
+  protected void testProgram() throws Exception {
 
 
-		Pipeline p = FlinkTestPipeline.createForBatch();
+    Pipeline p = FlinkTestPipeline.createForBatch();
 
 
-		final PCollectionView<String> sidesInput = p
-				.apply(Create.of(expected))
-				.apply(View.<String>asSingleton());
+    final PCollectionView<String> sidesInput = p
+        .apply(Create.of(expected))
+        .apply(View.<String>asSingleton());
 
-		p.apply(Create.of("bli"))
-				.apply(ParDo.of(new DoFn<String, String>() {
-					@Override
-					public void processElement(ProcessContext c) throws Exception {
-						String s = c.sideInput(sidesInput);
-						c.output(s);
-					}
-				}).withSideInputs(sidesInput)).apply(TextIO.Write.to(resultPath));
+    p.apply(Create.of("bli"))
+        .apply(ParDo.of(new DoFn<String, String>() {
+          @Override
+          public void processElement(ProcessContext c) throws Exception {
+            String s = c.sideInput(sidesInput);
+            c.output(s);
+          }
+        }).withSideInputs(sidesInput)).apply(TextIO.Write.to(resultPath));
 
-		p.run();
-	}
+    p.run();
+  }
 
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
 
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(expected, resultPath);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
index 07c1294..715d0be 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
@@ -32,45 +32,45 @@ import java.net.URI;
 
 public class TfIdfITCase extends JavaProgramTestBase {
 
-	protected String resultPath;
+  protected String resultPath;
 
-	public TfIdfITCase(){
-	}
+  public TfIdfITCase(){
+  }
 
-	static final String[] EXPECTED_RESULT = new String[] {
-			"a", "m", "n", "b", "c", "d"};
+  static final String[] EXPECTED_RESULT = new String[] {
+      "a", "m", "n", "b", "c", "d"};
 
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
 
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
-	}
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+  }
 
-	@Override
-	protected void testProgram() throws Exception {
+  @Override
+  protected void testProgram() throws Exception {
 
-		Pipeline pipeline = FlinkTestPipeline.createForBatch();
+    Pipeline pipeline = FlinkTestPipeline.createForBatch();
 
-		pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
+    pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
 
-		PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline
-				.apply(Create.of(
-						KV.of(new URI("x"), "a b c d"),
-						KV.of(new URI("y"), "a b c"),
-						KV.of(new URI("z"), "a m n")))
-				.apply(new TFIDF.ComputeTfIdf());
+    PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline
+        .apply(Create.of(
+            KV.of(new URI("x"), "a b c d"),
+            KV.of(new URI("y"), "a b c"),
+            KV.of(new URI("z"), "a m n")))
+        .apply(new TFIDF.ComputeTfIdf());
 
-		PCollection<String> words = wordToUriAndTfIdf
-				.apply(Keys.<String>create())
-				.apply(RemoveDuplicates.<String>create());
+    PCollection<String> words = wordToUriAndTfIdf
+        .apply(Keys.<String>create())
+        .apply(RemoveDuplicates.<String>create());
 
-		words.apply(TextIO.Write.to(resultPath));
+    words.apply(TextIO.Write.to(resultPath));
 
-		pipeline.run();
-	}
+    pipeline.run();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
index 9188097..f1a2454 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
@@ -32,43 +32,43 @@ import java.util.List;
 
 public class WordCountITCase extends JavaProgramTestBase {
 
-	protected String resultPath;
+  protected String resultPath;
 
-	public WordCountITCase(){
-	}
+  public WordCountITCase(){
+  }
 
-	static final String[] WORDS_ARRAY = new String[] {
-			"hi there", "hi", "hi sue bob",
-			"hi sue", "", "bob hi"};
+  static final String[] WORDS_ARRAY = new String[] {
+      "hi there", "hi", "hi sue bob",
+      "hi sue", "", "bob hi"};
 
-	static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
+  static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
 
-	static final String[] COUNTS_ARRAY = new String[] {
-			"hi: 5", "there: 1", "sue: 2", "bob: 2"};
+  static final String[] COUNTS_ARRAY = new String[] {
+      "hi: 5", "there: 1", "sue: 2", "bob: 2"};
 
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
 
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(Joiner.on('\n').join(COUNTS_ARRAY), resultPath);
-	}
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(COUNTS_ARRAY), resultPath);
+  }
 
-	@Override
-	protected void testProgram() throws Exception {
+  @Override
+  protected void testProgram() throws Exception {
 
-		Pipeline p = FlinkTestPipeline.createForBatch();
+    Pipeline p = FlinkTestPipeline.createForBatch();
 
-		PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
+    PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
 
-		input
-				.apply(new WordCount.CountWords())
-				.apply(MapElements.via(new WordCount.FormatAsTextFn()))
-				.apply(TextIO.Write.to(resultPath));
+    input
+        .apply(new WordCount.CountWords())
+        .apply(MapElements.via(new WordCount.FormatAsTextFn()))
+        .apply(TextIO.Write.to(resultPath));
 
-		p.run();
-	}
+    p.run();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
index ccc52c4..1cac036 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
@@ -33,104 +33,104 @@ import org.apache.flink.test.util.JavaProgramTestBase;
 
 public class WordCountJoin2ITCase extends JavaProgramTestBase {
 
-	static final String[] WORDS_1 = new String[] {
-			"hi there", "hi", "hi sue bob",
-			"hi sue", "", "bob hi"};
-
-	static final String[] WORDS_2 = new String[] {
-			"hi tim", "beauty", "hooray sue bob",
-			"hi there", "", "please say hi"};
-
-	static final String[] RESULTS = new String[] {
-			"beauty -> Tag1: Tag2: 1",
-			"bob -> Tag1: 2 Tag2: 1",
-			"hi -> Tag1: 5 Tag2: 3",
-			"hooray -> Tag1: Tag2: 1",
-			"please -> Tag1: Tag2: 1",
-			"say -> Tag1: Tag2: 1",
-			"sue -> Tag1: 2 Tag2: 1",
-			"there -> Tag1: 1 Tag2: 1",
-			"tim -> Tag1: Tag2: 1"
-	};
-
-	static final TupleTag<Long> tag1 = new TupleTag<>("Tag1");
-	static final TupleTag<Long> tag2 = new TupleTag<>("Tag2");
-
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		Pipeline p = FlinkTestPipeline.createForBatch();
-
-		/* Create two PCollections and join them */
-		PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1))
-				.apply(ParDo.of(new ExtractWordsFn()))
-				.apply(Count.<String>perElement());
-
-		PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2))
-				.apply(ParDo.of(new ExtractWordsFn()))
-				.apply(Count.<String>perElement());
-
-		/* CoGroup the two collections */
-		PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple
-				.of(tag1, occurences1)
-				.and(tag2, occurences2)
-				.apply(CoGroupByKey.<String>create());
-
-		/* Format output */
-		mergedOccurences.apply(ParDo.of(new FormatCountsFn()))
-				.apply(TextIO.Write.named("test").to(resultPath));
-
-		p.run();
-	}
-
-
-	static class ExtractWordsFn extends DoFn<String, String> {
-
-		@Override
-		public void startBundle(Context c) {
-		}
-
-		@Override
-		public void processElement(ProcessContext c) {
-			// Split the line into words.
-			String[] words = c.element().split("[^a-zA-Z']+");
-
-			// Output each word encountered into the output PCollection.
-			for (String word : words) {
-				if (!word.isEmpty()) {
-					c.output(word);
-				}
-			}
-		}
-	}
-
-	static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> {
-		@Override
-		public void processElement(ProcessContext c) {
-			CoGbkResult value = c.element().getValue();
-			String key = c.element().getKey();
-			String countTag1 = tag1.getId() + ": ";
-			String countTag2 = tag2.getId() + ": ";
-			for (Long count : value.getAll(tag1)) {
-				countTag1 += count + " ";
-			}
-			for (Long count : value.getAll(tag2)) {
-				countTag2 += count;
-			}
-			c.output(key + " -> " + countTag1 + countTag2);
-		}
-	}
+  static final String[] WORDS_1 = new String[] {
+      "hi there", "hi", "hi sue bob",
+      "hi sue", "", "bob hi"};
+
+  static final String[] WORDS_2 = new String[] {
+      "hi tim", "beauty", "hooray sue bob",
+      "hi there", "", "please say hi"};
+
+  static final String[] RESULTS = new String[] {
+      "beauty -> Tag1: Tag2: 1",
+      "bob -> Tag1: 2 Tag2: 1",
+      "hi -> Tag1: 5 Tag2: 3",
+      "hooray -> Tag1: Tag2: 1",
+      "please -> Tag1: Tag2: 1",
+      "say -> Tag1: Tag2: 1",
+      "sue -> Tag1: 2 Tag2: 1",
+      "there -> Tag1: 1 Tag2: 1",
+      "tim -> Tag1: Tag2: 1"
+  };
+
+  static final TupleTag<Long> tag1 = new TupleTag<>("Tag1");
+  static final TupleTag<Long> tag2 = new TupleTag<>("Tag2");
+
+  protected String resultPath;
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    /* Create two PCollections and join them */
+    PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1))
+        .apply(ParDo.of(new ExtractWordsFn()))
+        .apply(Count.<String>perElement());
+
+    PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2))
+        .apply(ParDo.of(new ExtractWordsFn()))
+        .apply(Count.<String>perElement());
+
+    /* CoGroup the two collections */
+    PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple
+        .of(tag1, occurences1)
+        .and(tag2, occurences2)
+        .apply(CoGroupByKey.<String>create());
+
+    /* Format output */
+    mergedOccurences.apply(ParDo.of(new FormatCountsFn()))
+        .apply(TextIO.Write.named("test").to(resultPath));
+
+    p.run();
+  }
+
+
+  static class ExtractWordsFn extends DoFn<String, String> {
+
+    @Override
+    public void startBundle(Context c) {
+    }
+
+    @Override
+    public void processElement(ProcessContext c) {
+      // Split the line into words.
+      String[] words = c.element().split("[^a-zA-Z']+");
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> {
+    @Override
+    public void processElement(ProcessContext c) {
+      CoGbkResult value = c.element().getValue();
+      String key = c.element().getKey();
+      String countTag1 = tag1.getId() + ": ";
+      String countTag2 = tag2.getId() + ": ";
+      for (Long count : value.getAll(tag1)) {
+        countTag1 += count + " ";
+      }
+      for (Long count : value.getAll(tag2)) {
+        countTag2 += count;
+      }
+      c.output(key + " -> " + countTag1 + countTag2);
+    }
+  }
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
index e6eddc0..4c8b99b 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
@@ -33,122 +33,122 @@ import org.apache.flink.test.util.JavaProgramTestBase;
 
 public class WordCountJoin3ITCase extends JavaProgramTestBase {
 
-	static final String[] WORDS_1 = new String[] {
-			"hi there", "hi", "hi sue bob",
-			"hi sue", "", "bob hi"};
-
-	static final String[] WORDS_2 = new String[] {
-			"hi tim", "beauty", "hooray sue bob",
-			"hi there", "", "please say hi"};
-
-	static final String[] WORDS_3 = new String[] {
-			"hi stephan", "beauty", "hooray big fabian",
-			"hi yo", "", "please say hi"};
-
-	static final String[] RESULTS = new String[] {
-			"beauty -> Tag1: Tag2: 1 Tag3: 1",
-			"bob -> Tag1: 2 Tag2: 1 Tag3: ",
-			"hi -> Tag1: 5 Tag2: 3 Tag3: 3",
-			"hooray -> Tag1: Tag2: 1 Tag3: 1",
-			"please -> Tag1: Tag2: 1 Tag3: 1",
-			"say -> Tag1: Tag2: 1 Tag3: 1",
-			"sue -> Tag1: 2 Tag2: 1 Tag3: ",
-			"there -> Tag1: 1 Tag2: 1 Tag3: ",
-			"tim -> Tag1: Tag2: 1 Tag3: ",
-			"stephan -> Tag1: Tag2: Tag3: 1",
-			"yo -> Tag1: Tag2: Tag3: 1",
-			"fabian -> Tag1: Tag2: Tag3: 1",
-			"big -> Tag1: Tag2: Tag3: 1"
-	};
-
-	static final TupleTag<Long> tag1 = new TupleTag<>("Tag1");
-	static final TupleTag<Long> tag2 = new TupleTag<>("Tag2");
-	static final TupleTag<Long> tag3 = new TupleTag<>("Tag3");
-
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-
-		Pipeline p = FlinkTestPipeline.createForBatch();
-
-		/* Create two PCollections and join them */
-		PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1))
-				.apply(ParDo.of(new ExtractWordsFn()))
-				.apply(Count.<String>perElement());
-
-		PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2))
-				.apply(ParDo.of(new ExtractWordsFn()))
-				.apply(Count.<String>perElement());
-
-		PCollection<KV<String,Long>> occurences3 = p.apply(Create.of(WORDS_3))
-				.apply(ParDo.of(new ExtractWordsFn()))
-				.apply(Count.<String>perElement());
-
-		/* CoGroup the two collections */
-		PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple
-				.of(tag1, occurences1)
-				.and(tag2, occurences2)
-				.and(tag3, occurences3)
-				.apply(CoGroupByKey.<String>create());
-
-		/* Format output */
-		mergedOccurences.apply(ParDo.of(new FormatCountsFn()))
-				.apply(TextIO.Write.named("test").to(resultPath));
-
-		p.run();
-	}
-
-
-	static class ExtractWordsFn extends DoFn<String, String> {
-
-		@Override
-		public void startBundle(Context c) {
-		}
-
-		@Override
-		public void processElement(ProcessContext c) {
-			// Split the line into words.
-			String[] words = c.element().split("[^a-zA-Z']+");
-
-			// Output each word encountered into the output PCollection.
-			for (String word : words) {
-				if (!word.isEmpty()) {
-					c.output(word);
-				}
-			}
-		}
-	}
-
-	static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> {
-		@Override
-		public void processElement(ProcessContext c) {
-			CoGbkResult value = c.element().getValue();
-			String key = c.element().getKey();
-			String countTag1 = tag1.getId() + ": ";
-			String countTag2 = tag2.getId() + ": ";
-			String countTag3 = tag3.getId() + ": ";
-			for (Long count : value.getAll(tag1)) {
-				countTag1 += count + " ";
-			}
-			for (Long count : value.getAll(tag2)) {
-				countTag2 += count + " ";
-			}
-			for (Long count : value.getAll(tag3)) {
-				countTag3 += count;
-			}
-			c.output(key + " -> " + countTag1 + countTag2 + countTag3);
-		}
-	}
+  static final String[] WORDS_1 = new String[] {
+      "hi there", "hi", "hi sue bob",
+      "hi sue", "", "bob hi"};
+
+  static final String[] WORDS_2 = new String[] {
+      "hi tim", "beauty", "hooray sue bob",
+      "hi there", "", "please say hi"};
+
+  static final String[] WORDS_3 = new String[] {
+      "hi stephan", "beauty", "hooray big fabian",
+      "hi yo", "", "please say hi"};
+
+  static final String[] RESULTS = new String[] {
+      "beauty -> Tag1: Tag2: 1 Tag3: 1",
+      "bob -> Tag1: 2 Tag2: 1 Tag3: ",
+      "hi -> Tag1: 5 Tag2: 3 Tag3: 3",
+      "hooray -> Tag1: Tag2: 1 Tag3: 1",
+      "please -> Tag1: Tag2: 1 Tag3: 1",
+      "say -> Tag1: Tag2: 1 Tag3: 1",
+      "sue -> Tag1: 2 Tag2: 1 Tag3: ",
+      "there -> Tag1: 1 Tag2: 1 Tag3: ",
+      "tim -> Tag1: Tag2: 1 Tag3: ",
+      "stephan -> Tag1: Tag2: Tag3: 1",
+      "yo -> Tag1: Tag2: Tag3: 1",
+      "fabian -> Tag1: Tag2: Tag3: 1",
+      "big -> Tag1: Tag2: Tag3: 1"
+  };
+
+  static final TupleTag<Long> tag1 = new TupleTag<>("Tag1");
+  static final TupleTag<Long> tag2 = new TupleTag<>("Tag2");
+  static final TupleTag<Long> tag3 = new TupleTag<>("Tag3");
+
+  protected String resultPath;
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    /* Create two PCollections and join them */
+    PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1))
+        .apply(ParDo.of(new ExtractWordsFn()))
+        .apply(Count.<String>perElement());
+
+    PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2))
+        .apply(ParDo.of(new ExtractWordsFn()))
+        .apply(Count.<String>perElement());
+
+    PCollection<KV<String,Long>> occurences3 = p.apply(Create.of(WORDS_3))
+        .apply(ParDo.of(new ExtractWordsFn()))
+        .apply(Count.<String>perElement());
+
+    /* CoGroup the two collections */
+    PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple
+        .of(tag1, occurences1)
+        .and(tag2, occurences2)
+        .and(tag3, occurences3)
+        .apply(CoGroupByKey.<String>create());
+
+    /* Format output */
+    mergedOccurences.apply(ParDo.of(new FormatCountsFn()))
+        .apply(TextIO.Write.named("test").to(resultPath));
+
+    p.run();
+  }
+
+
+  static class ExtractWordsFn extends DoFn<String, String> {
+
+    @Override
+    public void startBundle(Context c) {
+    }
+
+    @Override
+    public void processElement(ProcessContext c) {
+      // Split the line into words.
+      String[] words = c.element().split("[^a-zA-Z']+");
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> {
+    @Override
+    public void processElement(ProcessContext c) {
+      CoGbkResult value = c.element().getValue();
+      String key = c.element().getKey();
+      String countTag1 = tag1.getId() + ": ";
+      String countTag2 = tag2.getId() + ": ";
+      String countTag3 = tag3.getId() + ": ";
+      for (Long count : value.getAll(tag1)) {
+        countTag1 += count + " ";
+      }
+      for (Long count : value.getAll(tag2)) {
+        countTag2 += count + " ";
+      }
+      for (Long count : value.getAll(tag3)) {
+        countTag3 += count;
+      }
+      c.output(key + " -> " + countTag1 + countTag2 + countTag3);
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
index 865fc5f..a61bf52 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
@@ -39,118 +39,118 @@ import static org.junit.Assert.*;
  */
 public class WriteSinkITCase extends JavaProgramTestBase {
 
-	protected String resultPath;
+  protected String resultPath;
 
-	public WriteSinkITCase(){
-	}
+  public WriteSinkITCase(){
+  }
 
-	static final String[] EXPECTED_RESULT = new String[] {
-			"Joe red 3", "Mary blue 4", "Max yellow 23"};
+  static final String[] EXPECTED_RESULT = new String[] {
+      "Joe red 3", "Mary blue 4", "Max yellow 23"};
 
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		runProgram(resultPath);
-	}
-
-	private static void runProgram(String resultPath) {
-		Pipeline p = FlinkTestPipeline.createForBatch();
-
-		p.apply(Create.of(EXPECTED_RESULT)).setCoder(StringUtf8Coder.of())
-			.apply("CustomSink", Write.to(new MyCustomSink(resultPath)));
-
-		p.run();
-	}
-
-	/**
-	 * Simple custom sink which writes to a file.
-	 */
-	private static class MyCustomSink extends Sink<String> {
-
-		private final String resultPath;
-
-		public MyCustomSink(String resultPath) {
-			this.resultPath = resultPath;
-		}
-
-		@Override
-		public void validate(PipelineOptions options) {
-			assertNotNull(options);
-		}
-
-		@Override
-		public WriteOperation<String, ?> createWriteOperation(PipelineOptions options) {
-			return new MyWriteOperation();
-		}
-
-		private class MyWriteOperation extends WriteOperation<String, String> {
-
-			@Override
-			public Coder<String> getWriterResultCoder() {
-				return StringUtf8Coder.of();
-			}
-
-			@Override
-			public void initialize(PipelineOptions options) throws Exception {
-
-			}
-
-			@Override
-			public void finalize(Iterable<String> writerResults, PipelineOptions options) throws Exception {
-
-			}
-
-			@Override
-			public Writer<String, String> createWriter(PipelineOptions options) throws Exception {
-				return new MyWriter();
-			}
-
-			@Override
-			public Sink<String> getSink() {
-				return MyCustomSink.this;
-			}
-
-			/**
-			 * Simple Writer which writes to a file.
-			 */
-			private class MyWriter extends Writer<String, String> {
-
-				private PrintWriter internalWriter;
-
-				@Override
-				public void open(String uId) throws Exception {
-					Path path = new Path(resultPath + "/" + uId);
-					FileSystem.get(new URI("file:///")).create(path, false);
-					internalWriter = new PrintWriter(new File(path.toUri()));
-				}
-
-				@Override
-				public void write(String value) throws Exception {
-					internalWriter.println(value);
-				}
-
-				@Override
-				public String close() throws Exception {
-					internalWriter.close();
-					return resultPath;
-				}
-
-				@Override
-				public WriteOperation<String, String> getWriteOperation() {
-					return MyWriteOperation.this;
-				}
-			}
-		}
-	}
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+    runProgram(resultPath);
+  }
+
+  private static void runProgram(String resultPath) {
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    p.apply(Create.of(EXPECTED_RESULT)).setCoder(StringUtf8Coder.of())
+      .apply("CustomSink", Write.to(new MyCustomSink(resultPath)));
+
+    p.run();
+  }
+
+  /**
+   * Simple custom sink which writes to a file.
+   */
+  private static class MyCustomSink extends Sink<String> {
+
+    private final String resultPath;
+
+    public MyCustomSink(String resultPath) {
+      this.resultPath = resultPath;
+    }
+
+    @Override
+    public void validate(PipelineOptions options) {
+      assertNotNull(options);
+    }
+
+    @Override
+    public WriteOperation<String, ?> createWriteOperation(PipelineOptions options) {
+      return new MyWriteOperation();
+    }
+
+    private class MyWriteOperation extends WriteOperation<String, String> {
+
+      @Override
+      public Coder<String> getWriterResultCoder() {
+        return StringUtf8Coder.of();
+      }
+
+      @Override
+      public void initialize(PipelineOptions options) throws Exception {
+
+      }
+
+      @Override
+      public void finalize(Iterable<String> writerResults, PipelineOptions options) throws Exception {
+
+      }
+
+      @Override
+      public Writer<String, String> createWriter(PipelineOptions options) throws Exception {
+        return new MyWriter();
+      }
+
+      @Override
+      public Sink<String> getSink() {
+        return MyCustomSink.this;
+      }
+
+      /**
+       * Simple Writer which writes to a file.
+       */
+      private class MyWriter extends Writer<String, String> {
+
+        private PrintWriter internalWriter;
+
+        @Override
+        public void open(String uId) throws Exception {
+          Path path = new Path(resultPath + "/" + uId);
+          FileSystem.get(new URI("file:///")).create(path, false);
+          internalWriter = new PrintWriter(new File(path.toUri()));
+        }
+
+        @Override
+        public void write(String value) throws Exception {
+          internalWriter.println(value);
+        }
+
+        @Override
+        public String close() throws Exception {
+          internalWriter.close();
+          return resultPath;
+        }
+
+        @Override
+        public WriteOperation<String, String> getWriteOperation() {
+          return MyWriteOperation.this;
+        }
+      }
+    }
+  }
 
 }