You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:37 UTC

[42/50] [abbrv] incubator-beam git commit: [flink] convert tabs to 2 spaces

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java
index dd14f68..0b1a5da 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java
@@ -41,191 +41,191 @@ import static com.google.common.base.Preconditions.checkArgument;
  * */
 public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> extends UnboundedSource<String, C> {
 
-	private static final Coder<String> DEFAULT_SOCKET_CODER = StringUtf8Coder.of();
+  private static final Coder<String> DEFAULT_SOCKET_CODER = StringUtf8Coder.of();
 
-	private static final long serialVersionUID = 1L;
-
-	private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500;
-
-	private static final int CONNECTION_TIMEOUT_TIME = 0;
-
-	private final String hostname;
-	private final int port;
-	private final char delimiter;
-	private final long maxNumRetries;
-	private final long delayBetweenRetries;
-
-	public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries) {
-		this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP);
-	}
-
-	public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries, long delayBetweenRetries) {
-		this.hostname = hostname;
-		this.port = port;
-		this.delimiter = delimiter;
-		this.maxNumRetries = maxNumRetries;
-		this.delayBetweenRetries = delayBetweenRetries;
-	}
-
-	public String getHostname() {
-		return this.hostname;
-	}
-
-	public int getPort() {
-		return this.port;
-	}
-
-	public char getDelimiter() {
-		return this.delimiter;
-	}
-
-	public long getMaxNumRetries() {
-		return this.maxNumRetries;
-	}
-
-	public long getDelayBetweenRetries() {
-		return this.delayBetweenRetries;
-	}
-
-	@Override
-	public List<? extends UnboundedSource<String, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
-		return Collections.<UnboundedSource<String, C>>singletonList(this);
-	}
-
-	@Override
-	public UnboundedReader<String> createReader(PipelineOptions options, @Nullable C checkpointMark) {
-		return new UnboundedSocketReader(this);
-	}
-
-	@Nullable
-	@Override
-	public Coder getCheckpointMarkCoder() {
-		// Flink and Dataflow have different checkpointing mechanisms.
-		// In our case we do not need a coder.
-		return null;
-	}
-
-	@Override
-	public void validate() {
-		checkArgument(port > 0 && port < 65536, "port is out of range");
-		checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
-		checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive");
-	}
-
-	@Override
-	public Coder getDefaultOutputCoder() {
-		return DEFAULT_SOCKET_CODER;
-	}
-
-	public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String> implements Serializable {
-
-		private static final long serialVersionUID = 7526472295622776147L;
-		private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class);
-
-		private final UnboundedSocketSource source;
-
-		private Socket socket;
-		private BufferedReader reader;
-
-		private boolean isRunning;
-
-		private String currentRecord;
-
-		public UnboundedSocketReader(UnboundedSocketSource source) {
-			this.source = source;
-		}
-
-		private void openConnection() throws IOException {
-			this.socket = new Socket();
-			this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()), CONNECTION_TIMEOUT_TIME);
-			this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
-			this.isRunning = true;
-		}
-
-		@Override
-		public boolean start() throws IOException {
-			int attempt = 0;
-			while (!isRunning) {
-				try {
-					openConnection();
-					LOG.info("Connected to server socket " + this.source.getHostname() + ':' + this.source.getPort());
-
-					return advance();
-				} catch (IOException e) {
-					LOG.info("Lost connection to server socket " + this.source.getHostname() + ':' + this.source.getPort() + ". Retrying in " + this.source.getDelayBetweenRetries() + " msecs...");
-
-					if (this.source.getMaxNumRetries() == -1 || attempt++ < this.source.getMaxNumRetries()) {
-						try {
-							Thread.sleep(this.source.getDelayBetweenRetries());
-						} catch (InterruptedException e1) {
-							e1.printStackTrace();
-						}
-					} else {
-						this.isRunning = false;
-						break;
-					}
-				}
-			}
-			LOG.error("Unable to connect to host " + this.source.getHostname() + " : " + this.source.getPort());
-			return false;
-		}
-
-		@Override
-		public boolean advance() throws IOException {
-			final StringBuilder buffer = new StringBuilder();
-			int data;
-			while (isRunning && (data = reader.read()) != -1) {
-				// check if the string is complete
-				if (data != this.source.getDelimiter()) {
-					buffer.append((char) data);
-				} else {
-					if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\r') {
-						buffer.setLength(buffer.length() - 1);
-					}
-					this.currentRecord = buffer.toString();
-					buffer.setLength(0);
-					return true;
-				}
-			}
-			return false;
-		}
-
-		@Override
-		public byte[] getCurrentRecordId() throws NoSuchElementException {
-			return new byte[0];
-		}
-
-		@Override
-		public String getCurrent() throws NoSuchElementException {
-			return this.currentRecord;
-		}
-
-		@Override
-		public Instant getCurrentTimestamp() throws NoSuchElementException {
-			return Instant.now();
-		}
-
-		@Override
-		public void close() throws IOException {
-			this.reader.close();
-			this.socket.close();
-			this.isRunning = false;
-			LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":" + this.source.getPort() + ".");
-		}
-
-		@Override
-		public Instant getWatermark() {
-			return Instant.now();
-		}
-
-		@Override
-		public CheckpointMark getCheckpointMark() {
-			return null;
-		}
-
-		@Override
-		public UnboundedSource<String, ?> getCurrentSource() {
-			return this.source;
-		}
-	}
+  private static final long serialVersionUID = 1L;
+
+  private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500;
+
+  private static final int CONNECTION_TIMEOUT_TIME = 0;
+
+  private final String hostname;
+  private final int port;
+  private final char delimiter;
+  private final long maxNumRetries;
+  private final long delayBetweenRetries;
+
+  public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries) {
+    this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP);
+  }
+
+  public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries, long delayBetweenRetries) {
+    this.hostname = hostname;
+    this.port = port;
+    this.delimiter = delimiter;
+    this.maxNumRetries = maxNumRetries;
+    this.delayBetweenRetries = delayBetweenRetries;
+  }
+
+  public String getHostname() {
+    return this.hostname;
+  }
+
+  public int getPort() {
+    return this.port;
+  }
+
+  public char getDelimiter() {
+    return this.delimiter;
+  }
+
+  public long getMaxNumRetries() {
+    return this.maxNumRetries;
+  }
+
+  public long getDelayBetweenRetries() {
+    return this.delayBetweenRetries;
+  }
+
+  @Override
+  public List<? extends UnboundedSource<String, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
+    return Collections.<UnboundedSource<String, C>>singletonList(this);
+  }
+
+  @Override
+  public UnboundedReader<String> createReader(PipelineOptions options, @Nullable C checkpointMark) {
+    return new UnboundedSocketReader(this);
+  }
+
+  @Nullable
+  @Override
+  public Coder getCheckpointMarkCoder() {
+    // Flink and Dataflow have different checkpointing mechanisms.
+    // In our case we do not need a coder.
+    return null;
+  }
+
+  @Override
+  public void validate() {
+    checkArgument(port > 0 && port < 65536, "port is out of range");
+    checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
+    checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive");
+  }
+
+  @Override
+  public Coder getDefaultOutputCoder() {
+    return DEFAULT_SOCKET_CODER;
+  }
+
+  public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String> implements Serializable {
+
+    private static final long serialVersionUID = 7526472295622776147L;
+    private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class);
+
+    private final UnboundedSocketSource source;
+
+    private Socket socket;
+    private BufferedReader reader;
+
+    private boolean isRunning;
+
+    private String currentRecord;
+
+    public UnboundedSocketReader(UnboundedSocketSource source) {
+      this.source = source;
+    }
+
+    private void openConnection() throws IOException {
+      this.socket = new Socket();
+      this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()), CONNECTION_TIMEOUT_TIME);
+      this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
+      this.isRunning = true;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      int attempt = 0;
+      while (!isRunning) {
+        try {
+          openConnection();
+          LOG.info("Connected to server socket " + this.source.getHostname() + ':' + this.source.getPort());
+
+          return advance();
+        } catch (IOException e) {
+          LOG.info("Lost connection to server socket " + this.source.getHostname() + ':' + this.source.getPort() + ". Retrying in " + this.source.getDelayBetweenRetries() + " msecs...");
+
+          if (this.source.getMaxNumRetries() == -1 || attempt++ < this.source.getMaxNumRetries()) {
+            try {
+              Thread.sleep(this.source.getDelayBetweenRetries());
+            } catch (InterruptedException e1) {
+              e1.printStackTrace();
+            }
+          } else {
+            this.isRunning = false;
+            break;
+          }
+        }
+      }
+      LOG.error("Unable to connect to host " + this.source.getHostname() + " : " + this.source.getPort());
+      return false;
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      final StringBuilder buffer = new StringBuilder();
+      int data;
+      while (isRunning && (data = reader.read()) != -1) {
+        // check if the string is complete
+        if (data != this.source.getDelimiter()) {
+          buffer.append((char) data);
+        } else {
+          if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\r') {
+            buffer.setLength(buffer.length() - 1);
+          }
+          this.currentRecord = buffer.toString();
+          buffer.setLength(0);
+          return true;
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public byte[] getCurrentRecordId() throws NoSuchElementException {
+      return new byte[0];
+    }
+
+    @Override
+    public String getCurrent() throws NoSuchElementException {
+      return this.currentRecord;
+    }
+
+    @Override
+    public Instant getCurrentTimestamp() throws NoSuchElementException {
+      return Instant.now();
+    }
+
+    @Override
+    public void close() throws IOException {
+      this.reader.close();
+      this.socket.close();
+      this.isRunning = false;
+      LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":" + this.source.getPort() + ".");
+    }
+
+    @Override
+    public Instant getWatermark() {
+      return Instant.now();
+    }
+
+    @Override
+    public CheckpointMark getCheckpointMark() {
+      return null;
+    }
+
+    @Override
+    public UnboundedSource<String, ?> getCurrentSource() {
+      return this.source;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index e065f87..5a89894 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -38,95 +38,95 @@ import org.joda.time.Instant;
  * */
 public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<T>> implements Triggerable {
 
-	private final String name;
-	private final UnboundedSource.UnboundedReader<T> reader;
-
-	private StreamingRuntimeContext runtime = null;
-	private StreamSource.ManualWatermarkContext<WindowedValue<T>> context = null;
-
-	private volatile boolean isRunning = false;
-
-	public UnboundedSourceWrapper(PipelineOptions options, Read.Unbounded<T> transform) {
-		this.name = transform.getName();
-		this.reader = transform.getSource().createReader(options, null);
-	}
-
-	public String getName() {
-		return this.name;
-	}
-
-	WindowedValue<T> makeWindowedValue(T output, Instant timestamp) {
-		if (timestamp == null) {
-			timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
-		}
-		return WindowedValue.of(output, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
-	}
-
-	@Override
-	public void run(SourceContext<WindowedValue<T>> ctx) throws Exception {
-		if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
-			throw new RuntimeException("We assume that all sources in Dataflow are EventTimeSourceFunction. " +
-					"Apparently " + this.name + " is not. Probably you should consider writing your own Wrapper for this source.");
-		}
-
-		context = (StreamSource.ManualWatermarkContext<WindowedValue<T>>) ctx;
-		runtime = (StreamingRuntimeContext) getRuntimeContext();
-
-		this.isRunning = true;
-		boolean inputAvailable = reader.start();
-
-		setNextWatermarkTimer(this.runtime);
-
-		while (isRunning) {
-
-			while (!inputAvailable && isRunning) {
-				// wait a bit until we retry to pull more records
-				Thread.sleep(50);
-				inputAvailable = reader.advance();
-			}
-
-			if (inputAvailable) {
-
-				// get it and its timestamp from the source
-				T item = reader.getCurrent();
-				Instant timestamp = reader.getCurrentTimestamp();
-
-				// write it to the output collector
-				synchronized (ctx.getCheckpointLock()) {
-					context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
-				}
-
-				inputAvailable = reader.advance();
-			}
-
-		}
-	}
-
-	@Override
-	public void cancel() {
-		isRunning = false;
-	}
-
-	@Override
-	public void trigger(long timestamp) throws Exception {
-		if (this.isRunning) {
-			synchronized (context.getCheckpointLock()) {
-				long watermarkMillis = this.reader.getWatermark().getMillis();
-				context.emitWatermark(new Watermark(watermarkMillis));
-			}
-			setNextWatermarkTimer(this.runtime);
-		}
-	}
-
-	private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
-		if (this.isRunning) {
-			long watermarkInterval =  runtime.getExecutionConfig().getAutoWatermarkInterval();
-			long timeToNextWatermark = getTimeToNextWaternark(watermarkInterval);
-			runtime.registerTimer(timeToNextWatermark, this);
-		}
-	}
-
-	private long getTimeToNextWaternark(long watermarkInterval) {
-		return System.currentTimeMillis() + watermarkInterval;
-	}
+  private final String name;
+  private final UnboundedSource.UnboundedReader<T> reader;
+
+  private StreamingRuntimeContext runtime = null;
+  private StreamSource.ManualWatermarkContext<WindowedValue<T>> context = null;
+
+  private volatile boolean isRunning = false;
+
+  public UnboundedSourceWrapper(PipelineOptions options, Read.Unbounded<T> transform) {
+    this.name = transform.getName();
+    this.reader = transform.getSource().createReader(options, null);
+  }
+
+  public String getName() {
+    return this.name;
+  }
+
+  WindowedValue<T> makeWindowedValue(T output, Instant timestamp) {
+    if (timestamp == null) {
+      timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+    return WindowedValue.of(output, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+  }
+
+  @Override
+  public void run(SourceContext<WindowedValue<T>> ctx) throws Exception {
+    if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
+      throw new RuntimeException("We assume that all sources in Dataflow are EventTimeSourceFunction. " +
+          "Apparently " + this.name + " is not. Probably you should consider writing your own Wrapper for this source.");
+    }
+
+    context = (StreamSource.ManualWatermarkContext<WindowedValue<T>>) ctx;
+    runtime = (StreamingRuntimeContext) getRuntimeContext();
+
+    this.isRunning = true;
+    boolean inputAvailable = reader.start();
+
+    setNextWatermarkTimer(this.runtime);
+
+    while (isRunning) {
+
+      while (!inputAvailable && isRunning) {
+        // wait a bit until we retry to pull more records
+        Thread.sleep(50);
+        inputAvailable = reader.advance();
+      }
+
+      if (inputAvailable) {
+
+        // get it and its timestamp from the source
+        T item = reader.getCurrent();
+        Instant timestamp = reader.getCurrentTimestamp();
+
+        // write it to the output collector
+        synchronized (ctx.getCheckpointLock()) {
+          context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
+        }
+
+        inputAvailable = reader.advance();
+      }
+
+    }
+  }
+
+  @Override
+  public void cancel() {
+    isRunning = false;
+  }
+
+  @Override
+  public void trigger(long timestamp) throws Exception {
+    if (this.isRunning) {
+      synchronized (context.getCheckpointLock()) {
+        long watermarkMillis = this.reader.getWatermark().getMillis();
+        context.emitWatermark(new Watermark(watermarkMillis));
+      }
+      setNextWatermarkTimer(this.runtime);
+    }
+  }
+
+  private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
+    if (this.isRunning) {
+      long watermarkInterval =  runtime.getExecutionConfig().getAutoWatermarkInterval();
+      long timeToNextWatermark = getTimeToNextWaternark(watermarkInterval);
+      runtime.registerTimer(timeToNextWatermark, this);
+    }
+  }
+
+  private long getTimeToNextWaternark(long watermarkInterval) {
+    return System.currentTimeMillis() + watermarkInterval;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
index 84a322f..75c8ac6 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
@@ -34,93 +34,93 @@ import java.io.Serializable;
  * The latter is used when snapshots of the current state are taken, for fault-tolerance.
  * */
 public abstract class AbstractFlinkTimerInternals<K, VIN> implements TimerInternals, Serializable {
-	private Instant currentInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
-	private Instant currentOutputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
-	public void setCurrentInputWatermark(Instant watermark) {
-		checkIfValidInputWatermark(watermark);
-		this.currentInputWatermark = watermark;
-	}
-
-	public void setCurrentOutputWatermark(Instant watermark) {
-		checkIfValidOutputWatermark(watermark);
-		this.currentOutputWatermark = watermark;
-	}
-
-	private void setCurrentInputWatermarkAfterRecovery(Instant watermark) {
-		if (!currentInputWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
-			throw new RuntimeException("Explicitly setting the input watermark is only allowed on " +
-					"initialization after recovery from a node failure. Apparently this is not " +
-					"the case here as the watermark is already set.");
-		}
-		this.currentInputWatermark = watermark;
-	}
-
-	private void setCurrentOutputWatermarkAfterRecovery(Instant watermark) {
-		if (!currentOutputWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
-			throw new RuntimeException("Explicitly setting the output watermark is only allowed on " +
-				"initialization after recovery from a node failure. Apparently this is not " +
-				"the case here as the watermark is already set.");
-		}
-		this.currentOutputWatermark = watermark;
-	}
-
-	@Override
-	public Instant currentProcessingTime() {
-		return Instant.now();
-	}
-
-	@Override
-	public Instant currentInputWatermarkTime() {
-		return currentInputWatermark;
-	}
-
-	@Nullable
-	@Override
-	public Instant currentSynchronizedProcessingTime() {
-		// TODO
-		return null;
-	}
-
-	@Override
-	public Instant currentOutputWatermarkTime() {
-		return currentOutputWatermark;
-	}
-
-	private void checkIfValidInputWatermark(Instant newWatermark) {
-		if (currentInputWatermark.isAfter(newWatermark)) {
-			throw new IllegalArgumentException(String.format(
-					"Cannot set current input watermark to %s. Newer watermarks " +
-							"must be no earlier than the current one (%s).",
-					newWatermark, currentInputWatermark));
-		}
-	}
-
-	private void checkIfValidOutputWatermark(Instant newWatermark) {
-		if (currentOutputWatermark.isAfter(newWatermark)) {
-			throw new IllegalArgumentException(String.format(
-				"Cannot set current output watermark to %s. Newer watermarks " +
-					"must be no earlier than the current one (%s).",
-				newWatermark, currentOutputWatermark));
-		}
-	}
-
-	public void encodeTimerInternals(DoFn.ProcessContext context,
-	                                 StateCheckpointWriter writer,
-	                                 KvCoder<K, VIN> kvCoder,
-	                                 Coder<? extends BoundedWindow> windowCoder) throws IOException {
-		if (context == null) {
-			throw new RuntimeException("The Context has not been initialized.");
-		}
-
-		writer.setTimestamp(currentInputWatermark);
-		writer.setTimestamp(currentOutputWatermark);
-	}
-
-	public void restoreTimerInternals(StateCheckpointReader reader,
-	                                  KvCoder<K, VIN> kvCoder,
-	                                  Coder<? extends BoundedWindow> windowCoder) throws IOException {
-		setCurrentInputWatermarkAfterRecovery(reader.getTimestamp());
-		setCurrentOutputWatermarkAfterRecovery(reader.getTimestamp());
-	}
+  private Instant currentInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+  private Instant currentOutputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+  public void setCurrentInputWatermark(Instant watermark) {
+    checkIfValidInputWatermark(watermark);
+    this.currentInputWatermark = watermark;
+  }
+
+  public void setCurrentOutputWatermark(Instant watermark) {
+    checkIfValidOutputWatermark(watermark);
+    this.currentOutputWatermark = watermark;
+  }
+
+  private void setCurrentInputWatermarkAfterRecovery(Instant watermark) {
+    if (!currentInputWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
+      throw new RuntimeException("Explicitly setting the input watermark is only allowed on " +
+          "initialization after recovery from a node failure. Apparently this is not " +
+          "the case here as the watermark is already set.");
+    }
+    this.currentInputWatermark = watermark;
+  }
+
+  private void setCurrentOutputWatermarkAfterRecovery(Instant watermark) {
+    if (!currentOutputWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
+      throw new RuntimeException("Explicitly setting the output watermark is only allowed on " +
+        "initialization after recovery from a node failure. Apparently this is not " +
+        "the case here as the watermark is already set.");
+    }
+    this.currentOutputWatermark = watermark;
+  }
+
+  @Override
+  public Instant currentProcessingTime() {
+    return Instant.now();
+  }
+
+  @Override
+  public Instant currentInputWatermarkTime() {
+    return currentInputWatermark;
+  }
+
+  @Nullable
+  @Override
+  public Instant currentSynchronizedProcessingTime() {
+    // TODO
+    return null;
+  }
+
+  @Override
+  public Instant currentOutputWatermarkTime() {
+    return currentOutputWatermark;
+  }
+
+  private void checkIfValidInputWatermark(Instant newWatermark) {
+    if (currentInputWatermark.isAfter(newWatermark)) {
+      throw new IllegalArgumentException(String.format(
+          "Cannot set current input watermark to %s. Newer watermarks " +
+              "must be no earlier than the current one (%s).",
+          newWatermark, currentInputWatermark));
+    }
+  }
+
+  private void checkIfValidOutputWatermark(Instant newWatermark) {
+    if (currentOutputWatermark.isAfter(newWatermark)) {
+      throw new IllegalArgumentException(String.format(
+        "Cannot set current output watermark to %s. Newer watermarks " +
+          "must be no earlier than the current one (%s).",
+        newWatermark, currentOutputWatermark));
+    }
+  }
+
+  public void encodeTimerInternals(DoFn.ProcessContext context,
+                                   StateCheckpointWriter writer,
+                                   KvCoder<K, VIN> kvCoder,
+                                   Coder<? extends BoundedWindow> windowCoder) throws IOException {
+    if (context == null) {
+      throw new RuntimeException("The Context has not been initialized.");
+    }
+
+    writer.setTimestamp(currentInputWatermark);
+    writer.setTimestamp(currentOutputWatermark);
+  }
+
+  public void restoreTimerInternals(StateCheckpointReader reader,
+                                    KvCoder<K, VIN> kvCoder,
+                                    Coder<? extends BoundedWindow> windowCoder) throws IOException {
+    setCurrentInputWatermarkAfterRecovery(reader.getTimestamp());
+    setCurrentOutputWatermarkAfterRecovery(reader.getTimestamp());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java
index 41ab5f0..39fec14 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -41,673 +41,673 @@ import java.util.*;
  */
 public class FlinkStateInternals<K> implements StateInternals<K> {
 
-	private final K key;
-
-	private final Coder<K> keyCoder;
-
-	private final Coder<? extends BoundedWindow> windowCoder;
-
-	private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
-
-	private Instant watermarkHoldAccessor;
-
-	public FlinkStateInternals(K key,
-	                           Coder<K> keyCoder,
-	                           Coder<? extends BoundedWindow> windowCoder,
-	                           OutputTimeFn<? super BoundedWindow> outputTimeFn) {
-		this.key = key;
-		this.keyCoder = keyCoder;
-		this.windowCoder = windowCoder;
-		this.outputTimeFn = outputTimeFn;
-	}
-
-	public Instant getWatermarkHold() {
-		return watermarkHoldAccessor;
-	}
-
-	/**
-	 * This is the interface state has to implement in order for it to be fault tolerant when
-	 * executed by the FlinkPipelineRunner.
-	 */
-	private interface CheckpointableIF {
-
-		boolean shouldPersist();
-
-		void persistState(StateCheckpointWriter checkpointBuilder) throws IOException;
-	}
-
-	protected final StateTable<K> inMemoryState = new StateTable<K>() {
-		@Override
-		protected StateTag.StateBinder binderForNamespace(final StateNamespace namespace, final StateContext<?> c) {
-			return new StateTag.StateBinder<K>() {
-
-				@Override
-				public <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
-					return new FlinkInMemoryValue<>(encodeKey(namespace, address), coder);
-				}
-
-				@Override
-				public <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
-					return new FlinkInMemoryBag<>(encodeKey(namespace, address), elemCoder);
-				}
-
-				@Override
-				public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(
-						StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-						Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
-					return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c);
-				}
-
-				@Override
-				public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
-						StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-						Coder<AccumT> accumCoder,
-						Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-					return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c);
-				}
-
-				@Override
-				public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
-						StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-						Coder<AccumT> accumCoder,
-						CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
-					return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c);
-				}
-
-				@Override
-				public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(StateTag<? super K, WatermarkHoldState<W>> address, OutputTimeFn<? super W> outputTimeFn) {
-					return new FlinkWatermarkHoldStateImpl<>(encodeKey(namespace, address), outputTimeFn);
-				}
-			};
-		}
-	};
-
-	@Override
-	public K getKey() {
-		return key;
-	}
-
-	@Override
-	public <StateT extends State> StateT state(StateNamespace namespace, StateTag<? super K, StateT> address) {
-		return inMemoryState.get(namespace, address, null);
-	}
-
-	@Override
-	public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c) {
-		return inMemoryState.get(namespace, address, c);
-	}
-
-	public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
-		checkpointBuilder.writeInt(getNoOfElements());
-
-		for (State location : inMemoryState.values()) {
-			if (!(location instanceof CheckpointableIF)) {
-				throw new IllegalStateException(String.format(
-						"%s wasn't created by %s -- unable to persist it",
-						location.getClass().getSimpleName(),
-						getClass().getSimpleName()));
-			}
-			((CheckpointableIF) location).persistState(checkpointBuilder);
-		}
-	}
-
-	public void restoreState(StateCheckpointReader checkpointReader, ClassLoader loader)
-			throws IOException, ClassNotFoundException {
-
-		// the number of elements to read.
-		int noOfElements = checkpointReader.getInt();
-		for (int i = 0; i < noOfElements; i++) {
-			decodeState(checkpointReader, loader);
-		}
-	}
-
-	/**
-	 * We remove the first character which encodes the type of the stateTag ('s' for system
-	 * and 'u' for user). For more details check out the source of
-	 * {@link StateTags.StateTagBase#getId()}.
-	 */
-	private void decodeState(StateCheckpointReader reader, ClassLoader loader)
-			throws IOException, ClassNotFoundException {
-
-		StateType stateItemType = StateType.deserialize(reader);
-		ByteString stateKey = reader.getTag();
-
-		// first decode the namespace and the tagId...
-		String[] namespaceAndTag = stateKey.toStringUtf8().split("\\+");
-		if (namespaceAndTag.length != 2) {
-			throw new IllegalArgumentException("Invalid stateKey " + stateKey.toString() + ".");
-		}
-		StateNamespace namespace = StateNamespaces.fromString(namespaceAndTag[0], windowCoder);
-
-		// ... decide if it is a system or user stateTag...
-		char ownerTag = namespaceAndTag[1].charAt(0);
-		if (ownerTag != 's' && ownerTag != 'u') {
-			throw new RuntimeException("Invalid StateTag name.");
-		}
-		boolean isSystemTag = ownerTag == 's';
-		String tagId = namespaceAndTag[1].substring(1);
-
-		// ...then decode the coder (if there is one)...
-		Coder<?> coder = null;
-		switch (stateItemType) {
-			case VALUE:
-			case LIST:
-			case ACCUMULATOR:
-				ByteString coderBytes = reader.getData();
-				coder = InstantiationUtil.deserializeObject(coderBytes.toByteArray(), loader);
-				break;
-			case WATERMARK:
-				break;
-		}
-
-		// ...then decode the combiner function (if there is one)...
-		CombineWithContext.KeyedCombineFnWithContext<? super K, ?, ?, ?> combineFn = null;
-		switch (stateItemType) {
-			case ACCUMULATOR:
-				ByteString combinerBytes = reader.getData();
-				combineFn = InstantiationUtil.deserializeObject(combinerBytes.toByteArray(), loader);
-				break;
-			case VALUE:
-			case LIST:
-			case WATERMARK:
-				break;
-		}
-
-		//... and finally, depending on the type of the state being decoded,
-		// 1) create the adequate stateTag,
-		// 2) create the state container,
-		// 3) restore the actual content.
-		switch (stateItemType) {
-			case VALUE: {
-				StateTag stateTag = StateTags.value(tagId, coder);
-				stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
-				@SuppressWarnings("unchecked")
-				FlinkInMemoryValue<?> value = (FlinkInMemoryValue<?>) inMemoryState.get(namespace, stateTag, null);
-				value.restoreState(reader);
-				break;
-			}
-			case WATERMARK: {
-				@SuppressWarnings("unchecked")
-				StateTag<Object, WatermarkHoldState<BoundedWindow>> stateTag = StateTags.watermarkStateInternal(tagId, outputTimeFn);
-				stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
-				@SuppressWarnings("unchecked")
-				FlinkWatermarkHoldStateImpl<?> watermark = (FlinkWatermarkHoldStateImpl<?>) inMemoryState.get(namespace, stateTag, null);
-				watermark.restoreState(reader);
-				break;
-			}
-			case LIST: {
-				StateTag stateTag = StateTags.bag(tagId, coder);
-				stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
-				FlinkInMemoryBag<?> bag = (FlinkInMemoryBag<?>) inMemoryState.get(namespace, stateTag, null);
-				bag.restoreState(reader);
-				break;
-			}
-			case ACCUMULATOR: {
-				@SuppressWarnings("unchecked")
-				StateTag<K, AccumulatorCombiningState<?, ?, ?>> stateTag = StateTags.keyedCombiningValueWithContext(tagId, (Coder) coder, combineFn);
-				stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
-				@SuppressWarnings("unchecked")
-				FlinkInMemoryKeyedCombiningValue<?, ?, ?> combiningValue =
-						(FlinkInMemoryKeyedCombiningValue<?, ?, ?>) inMemoryState.get(namespace, stateTag, null);
-				combiningValue.restoreState(reader);
-				break;
-			}
-			default:
-				throw new RuntimeException("Unknown State Type " + stateItemType + ".");
-		}
-	}
-
-	private ByteString encodeKey(StateNamespace namespace, StateTag<? super K, ?> address) {
-		StringBuilder sb = new StringBuilder();
-		try {
-			namespace.appendTo(sb);
-			sb.append('+');
-			address.appendTo(sb);
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-		return ByteString.copyFromUtf8(sb.toString());
-	}
-
-	private int getNoOfElements() {
-		int noOfElements = 0;
-		for (State state : inMemoryState.values()) {
-			if (!(state instanceof CheckpointableIF)) {
-				throw new RuntimeException("State Implementations used by the " +
-						"Flink Dataflow Runner should implement the CheckpointableIF interface.");
-			}
-
-			if (((CheckpointableIF) state).shouldPersist()) {
-				noOfElements++;
-			}
-		}
-		return noOfElements;
-	}
-
-	private final class FlinkInMemoryValue<T> implements ValueState<T>, CheckpointableIF {
-
-		private final ByteString stateKey;
-		private final Coder<T> elemCoder;
-
-		private T value = null;
-
-		public FlinkInMemoryValue(ByteString stateKey, Coder<T> elemCoder) {
-			this.stateKey = stateKey;
-			this.elemCoder = elemCoder;
-		}
-
-		@Override
-		public void clear() {
-			value = null;
-		}
-
-		@Override
-		public void write(T input) {
-			this.value = input;
-		}
-
-		@Override
-		public T read() {
-			return value;
-		}
-
-		@Override
-		public ValueState<T> readLater() {
-			// Ignore
-			return this;
-		}
-
-		@Override
-		public boolean shouldPersist() {
-			return value != null;
-		}
-
-		@Override
-		public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
-			if (value != null) {
-				// serialize the coder.
-				byte[] coder = InstantiationUtil.serializeObject(elemCoder);
-
-				// encode the value into a ByteString
-				ByteString.Output stream = ByteString.newOutput();
-				elemCoder.encode(value, stream, Coder.Context.OUTER);
-				ByteString data = stream.toByteString();
-
-				checkpointBuilder.addValueBuilder()
-					.setTag(stateKey)
-					.setData(coder)
-					.setData(data);
-			}
-		}
-
-		public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
-			ByteString valueContent = checkpointReader.getData();
-			T outValue = elemCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
-			write(outValue);
-		}
-	}
-
-	private final class FlinkWatermarkHoldStateImpl<W extends BoundedWindow>
-			implements WatermarkHoldState<W>, CheckpointableIF {
-
-		private final ByteString stateKey;
-
-		private Instant minimumHold = null;
-
-		private OutputTimeFn<? super W> outputTimeFn;
-
-		public FlinkWatermarkHoldStateImpl(ByteString stateKey, OutputTimeFn<? super W> outputTimeFn) {
-			this.stateKey = stateKey;
-			this.outputTimeFn = outputTimeFn;
-		}
-
-		@Override
-		public void clear() {
-			// Even though we're clearing we can't remove this from the in-memory state map, since
-			// other users may already have a handle on this WatermarkBagInternal.
-			minimumHold = null;
-			watermarkHoldAccessor = null;
-		}
-
-		@Override
-		public void add(Instant watermarkHold) {
-			if (minimumHold == null || minimumHold.isAfter(watermarkHold)) {
-				watermarkHoldAccessor = watermarkHold;
-				minimumHold = watermarkHold;
-			}
-		}
-
-		@Override
-		public ReadableState<Boolean> isEmpty() {
-			return new ReadableState<Boolean>() {
-				@Override
-				public Boolean read() {
-					return minimumHold == null;
-				}
-
-				@Override
-				public ReadableState<Boolean> readLater() {
-					// Ignore
-					return this;
-				}
-			};
-		}
-
-		@Override
-		public OutputTimeFn<? super W> getOutputTimeFn() {
-			return outputTimeFn;
-		}
-
-		@Override
-		public Instant read() {
-			return minimumHold;
-		}
-
-		@Override
-		public WatermarkHoldState<W> readLater() {
-			// Ignore
-			return this;
-		}
-
-		@Override
-		public String toString() {
-			return Objects.toString(minimumHold);
-		}
-
-		@Override
-		public boolean shouldPersist() {
-			return minimumHold != null;
-		}
-
-		@Override
-		public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
-			if (minimumHold != null) {
-				checkpointBuilder.addWatermarkHoldsBuilder()
-						.setTag(stateKey)
-						.setTimestamp(minimumHold);
-			}
-		}
-
-		public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
-			Instant watermark = checkpointReader.getTimestamp();
-			add(watermark);
-		}
-	}
-
-
-	private static <K, InputT, AccumT, OutputT> CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> withContext(
-			final Combine.KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
-		return new CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
-			@Override
-			public AccumT createAccumulator(K key, CombineWithContext.Context c) {
-				return combineFn.createAccumulator(key);
-			}
-
-			@Override
-			public AccumT addInput(K key, AccumT accumulator, InputT value, CombineWithContext.Context c) {
-				return combineFn.addInput(key, accumulator, value);
-			}
-
-			@Override
-			public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, CombineWithContext.Context c) {
-				return combineFn.mergeAccumulators(key, accumulators);
-			}
-
-			@Override
-			public OutputT extractOutput(K key, AccumT accumulator, CombineWithContext.Context c) {
-				return combineFn.extractOutput(key, accumulator);
-			}
-		};
-	}
-
-	private static <K, InputT, AccumT, OutputT> CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> withKeyAndContext(
-			final Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
-		return new CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
-			@Override
-			public AccumT createAccumulator(K key, CombineWithContext.Context c) {
-				return combineFn.createAccumulator();
-			}
-
-			@Override
-			public AccumT addInput(K key, AccumT accumulator, InputT value, CombineWithContext.Context c) {
-				return combineFn.addInput(accumulator, value);
-			}
-
-			@Override
-			public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, CombineWithContext.Context c) {
-				return combineFn.mergeAccumulators(accumulators);
-			}
-
-			@Override
-			public OutputT extractOutput(K key, AccumT accumulator, CombineWithContext.Context c) {
-				return combineFn.extractOutput(accumulator);
-			}
-		};
-	}
-
-	private final class FlinkInMemoryKeyedCombiningValue<InputT, AccumT, OutputT>
-			implements AccumulatorCombiningState<InputT, AccumT, OutputT>, CheckpointableIF {
-
-		private final ByteString stateKey;
-		private final CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn;
-		private final Coder<AccumT> accumCoder;
-		private final CombineWithContext.Context context;
-
-		private AccumT accum = null;
-		private boolean isClear = true;
-
-		private FlinkInMemoryKeyedCombiningValue(ByteString stateKey,
-		                                         Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
-		                                         Coder<AccumT> accumCoder,
-		                                         final StateContext<?> stateContext) {
-			this(stateKey, withKeyAndContext(combineFn), accumCoder, stateContext);
-		}
-
-
-		private FlinkInMemoryKeyedCombiningValue(ByteString stateKey,
-		                                         Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
-		                                         Coder<AccumT> accumCoder,
-		                                         final StateContext<?> stateContext) {
-			this(stateKey, withContext(combineFn), accumCoder, stateContext);
-		}
-
-		private FlinkInMemoryKeyedCombiningValue(ByteString stateKey,
-		                                         CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn,
-		                                         Coder<AccumT> accumCoder,
-		                                         final StateContext<?> stateContext) {
-			Preconditions.checkNotNull(combineFn);
-			Preconditions.checkNotNull(accumCoder);
-
-			this.stateKey = stateKey;
-			this.combineFn = combineFn;
-			this.accumCoder = accumCoder;
-			this.context = new CombineWithContext.Context() {
-				@Override
-				public PipelineOptions getPipelineOptions() {
-					return stateContext.getPipelineOptions();
-				}
-
-				@Override
-				public <T> T sideInput(PCollectionView<T> view) {
-					return stateContext.sideInput(view);
-				}
-			};
-			accum = combineFn.createAccumulator(key, context);
-		}
-
-		@Override
-		public void clear() {
-			accum = combineFn.createAccumulator(key, context);
-			isClear = true;
-		}
-
-		@Override
-		public void add(InputT input) {
-			isClear = false;
-			accum = combineFn.addInput(key, accum, input, context);
-		}
-
-		@Override
-		public AccumT getAccum() {
-			return accum;
-		}
-
-		@Override
-		public ReadableState<Boolean> isEmpty() {
-			return new ReadableState<Boolean>() {
-				@Override
-				public ReadableState<Boolean> readLater() {
-					// Ignore
-					return this;
-				}
-
-				@Override
-				public Boolean read() {
-					return isClear;
-				}
-			};
-		}
-
-		@Override
-		public void addAccum(AccumT accum) {
-			isClear = false;
-			this.accum = combineFn.mergeAccumulators(key, Arrays.asList(this.accum, accum), context);
-		}
-
-		@Override
-		public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-			return combineFn.mergeAccumulators(key, accumulators, context);
-		}
-
-		@Override
-		public OutputT read() {
-			return combineFn.extractOutput(key, accum, context);
-		}
-
-		@Override
-		public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
-			// Ignore
-			return this;
-		}
-
-		@Override
-		public boolean shouldPersist() {
-			return !isClear;
-		}
-
-		@Override
-		public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
-			if (!isClear) {
-				// serialize the coder.
-				byte[] coder = InstantiationUtil.serializeObject(accumCoder);
-
-				// serialize the combiner.
-				byte[] combiner = InstantiationUtil.serializeObject(combineFn);
-
-				// encode the accumulator into a ByteString
-				ByteString.Output stream = ByteString.newOutput();
-				accumCoder.encode(accum, stream, Coder.Context.OUTER);
-				ByteString data = stream.toByteString();
-
-				// put the flag that the next serialized element is an accumulator
-				checkpointBuilder.addAccumulatorBuilder()
-					.setTag(stateKey)
-					.setData(coder)
-					.setData(combiner)
-					.setData(data);
-			}
-		}
-
-		public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
-			ByteString valueContent = checkpointReader.getData();
-			AccumT accum = this.accumCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
-			addAccum(accum);
-		}
-	}
-
-	private static final class FlinkInMemoryBag<T> implements BagState<T>, CheckpointableIF {
-		private final List<T> contents = new ArrayList<>();
-
-		private final ByteString stateKey;
-		private final Coder<T> elemCoder;
-
-		public FlinkInMemoryBag(ByteString stateKey, Coder<T> elemCoder) {
-			this.stateKey = stateKey;
-			this.elemCoder = elemCoder;
-		}
-
-		@Override
-		public void clear() {
-			contents.clear();
-		}
-
-		@Override
-		public Iterable<T> read() {
-			return contents;
-		}
-
-		@Override
-		public BagState<T> readLater() {
-			// Ignore
-			return this;
-		}
-
-		@Override
-		public void add(T input) {
-			contents.add(input);
-		}
-
-		@Override
-		public ReadableState<Boolean> isEmpty() {
-			return new ReadableState<Boolean>() {
-				@Override
-				public ReadableState<Boolean> readLater() {
-					// Ignore
-					return this;
-				}
-
-				@Override
-				public Boolean read() {
-					return contents.isEmpty();
-				}
-			};
-		}
-
-		@Override
-		public boolean shouldPersist() {
-			return !contents.isEmpty();
-		}
-
-		@Override
-		public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
-			if (!contents.isEmpty()) {
-				// serialize the coder.
-				byte[] coder = InstantiationUtil.serializeObject(elemCoder);
-
-				checkpointBuilder.addListUpdatesBuilder()
-						.setTag(stateKey)
-						.setData(coder)
-						.writeInt(contents.size());
-
-				for (T item : contents) {
-					// encode the element
-					ByteString.Output stream = ByteString.newOutput();
-					elemCoder.encode(item, stream, Coder.Context.OUTER);
-					ByteString data = stream.toByteString();
-
-					// add the data to the checkpoint.
-					checkpointBuilder.setData(data);
-				}
-			}
-		}
-
-		public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
-			int noOfValues = checkpointReader.getInt();
-			for (int j = 0; j < noOfValues; j++) {
-				ByteString valueContent = checkpointReader.getData();
-				T outValue = elemCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
-				add(outValue);
-			}
-		}
-	}
+  private final K key;
+
+  private final Coder<K> keyCoder;
+
+  private final Coder<? extends BoundedWindow> windowCoder;
+
+  private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
+
+  private Instant watermarkHoldAccessor;
+
+  public FlinkStateInternals(K key,
+                             Coder<K> keyCoder,
+                             Coder<? extends BoundedWindow> windowCoder,
+                             OutputTimeFn<? super BoundedWindow> outputTimeFn) {
+    this.key = key;
+    this.keyCoder = keyCoder;
+    this.windowCoder = windowCoder;
+    this.outputTimeFn = outputTimeFn;
+  }
+
+  public Instant getWatermarkHold() {
+    return watermarkHoldAccessor;
+  }
+
+  /**
+   * This is the interface state has to implement in order for it to be fault tolerant when
+   * executed by the FlinkPipelineRunner.
+   */
+  private interface CheckpointableIF {
+
+    boolean shouldPersist();
+
+    void persistState(StateCheckpointWriter checkpointBuilder) throws IOException;
+  }
+
+  protected final StateTable<K> inMemoryState = new StateTable<K>() {
+    @Override
+    protected StateTag.StateBinder binderForNamespace(final StateNamespace namespace, final StateContext<?> c) {
+      return new StateTag.StateBinder<K>() {
+
+        @Override
+        public <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+          return new FlinkInMemoryValue<>(encodeKey(namespace, address), coder);
+        }
+
+        @Override
+        public <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+          return new FlinkInMemoryBag<>(encodeKey(namespace, address), elemCoder);
+        }
+
+        @Override
+        public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+            Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+          return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c);
+        }
+
+        @Override
+        public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+            Coder<AccumT> accumCoder,
+            Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
+          return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c);
+        }
+
+        @Override
+        public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+            Coder<AccumT> accumCoder,
+            CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
+          return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c);
+        }
+
+        @Override
+        public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(StateTag<? super K, WatermarkHoldState<W>> address, OutputTimeFn<? super W> outputTimeFn) {
+          return new FlinkWatermarkHoldStateImpl<>(encodeKey(namespace, address), outputTimeFn);
+        }
+      };
+    }
+  };
+
+  @Override
+  public K getKey() {
+    return key;
+  }
+
+  @Override
+  public <StateT extends State> StateT state(StateNamespace namespace, StateTag<? super K, StateT> address) {
+    return inMemoryState.get(namespace, address, null);
+  }
+
+  @Override
+  public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c) {
+    return inMemoryState.get(namespace, address, c);
+  }
+
+  public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
+    checkpointBuilder.writeInt(getNoOfElements());
+
+    for (State location : inMemoryState.values()) {
+      if (!(location instanceof CheckpointableIF)) {
+        throw new IllegalStateException(String.format(
+            "%s wasn't created by %s -- unable to persist it",
+            location.getClass().getSimpleName(),
+            getClass().getSimpleName()));
+      }
+      ((CheckpointableIF) location).persistState(checkpointBuilder);
+    }
+  }
+
+  public void restoreState(StateCheckpointReader checkpointReader, ClassLoader loader)
+      throws IOException, ClassNotFoundException {
+
+    // the number of elements to read.
+    int noOfElements = checkpointReader.getInt();
+    for (int i = 0; i < noOfElements; i++) {
+      decodeState(checkpointReader, loader);
+    }
+  }
+
+  /**
+   * We remove the first character which encodes the type of the stateTag ('s' for system
+   * and 'u' for user). For more details check out the source of
+   * {@link StateTags.StateTagBase#getId()}.
+   */
+  private void decodeState(StateCheckpointReader reader, ClassLoader loader)
+      throws IOException, ClassNotFoundException {
+
+    StateType stateItemType = StateType.deserialize(reader);
+    ByteString stateKey = reader.getTag();
+
+    // first decode the namespace and the tagId...
+    String[] namespaceAndTag = stateKey.toStringUtf8().split("\\+");
+    if (namespaceAndTag.length != 2) {
+      throw new IllegalArgumentException("Invalid stateKey " + stateKey.toString() + ".");
+    }
+    StateNamespace namespace = StateNamespaces.fromString(namespaceAndTag[0], windowCoder);
+
+    // ... decide if it is a system or user stateTag...
+    char ownerTag = namespaceAndTag[1].charAt(0);
+    if (ownerTag != 's' && ownerTag != 'u') {
+      throw new RuntimeException("Invalid StateTag name.");
+    }
+    boolean isSystemTag = ownerTag == 's';
+    String tagId = namespaceAndTag[1].substring(1);
+
+    // ...then decode the coder (if there is one)...
+    Coder<?> coder = null;
+    switch (stateItemType) {
+      case VALUE:
+      case LIST:
+      case ACCUMULATOR:
+        ByteString coderBytes = reader.getData();
+        coder = InstantiationUtil.deserializeObject(coderBytes.toByteArray(), loader);
+        break;
+      case WATERMARK:
+        break;
+    }
+
+    // ...then decode the combiner function (if there is one)...
+    CombineWithContext.KeyedCombineFnWithContext<? super K, ?, ?, ?> combineFn = null;
+    switch (stateItemType) {
+      case ACCUMULATOR:
+        ByteString combinerBytes = reader.getData();
+        combineFn = InstantiationUtil.deserializeObject(combinerBytes.toByteArray(), loader);
+        break;
+      case VALUE:
+      case LIST:
+      case WATERMARK:
+        break;
+    }
+
+    //... and finally, depending on the type of the state being decoded,
+    // 1) create the adequate stateTag,
+    // 2) create the state container,
+    // 3) restore the actual content.
+    switch (stateItemType) {
+      case VALUE: {
+        StateTag stateTag = StateTags.value(tagId, coder);
+        stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
+        @SuppressWarnings("unchecked")
+        FlinkInMemoryValue<?> value = (FlinkInMemoryValue<?>) inMemoryState.get(namespace, stateTag, null);
+        value.restoreState(reader);
+        break;
+      }
+      case WATERMARK: {
+        @SuppressWarnings("unchecked")
+        StateTag<Object, WatermarkHoldState<BoundedWindow>> stateTag = StateTags.watermarkStateInternal(tagId, outputTimeFn);
+        stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
+        @SuppressWarnings("unchecked")
+        FlinkWatermarkHoldStateImpl<?> watermark = (FlinkWatermarkHoldStateImpl<?>) inMemoryState.get(namespace, stateTag, null);
+        watermark.restoreState(reader);
+        break;
+      }
+      case LIST: {
+        StateTag stateTag = StateTags.bag(tagId, coder);
+        stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
+        FlinkInMemoryBag<?> bag = (FlinkInMemoryBag<?>) inMemoryState.get(namespace, stateTag, null);
+        bag.restoreState(reader);
+        break;
+      }
+      case ACCUMULATOR: {
+        @SuppressWarnings("unchecked")
+        StateTag<K, AccumulatorCombiningState<?, ?, ?>> stateTag = StateTags.keyedCombiningValueWithContext(tagId, (Coder) coder, combineFn);
+        stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
+        @SuppressWarnings("unchecked")
+        FlinkInMemoryKeyedCombiningValue<?, ?, ?> combiningValue =
+            (FlinkInMemoryKeyedCombiningValue<?, ?, ?>) inMemoryState.get(namespace, stateTag, null);
+        combiningValue.restoreState(reader);
+        break;
+      }
+      default:
+        throw new RuntimeException("Unknown State Type " + stateItemType + ".");
+    }
+  }
+
+  private ByteString encodeKey(StateNamespace namespace, StateTag<? super K, ?> address) {
+    StringBuilder sb = new StringBuilder();
+    try {
+      namespace.appendTo(sb);
+      sb.append('+');
+      address.appendTo(sb);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return ByteString.copyFromUtf8(sb.toString());
+  }
+
+  private int getNoOfElements() {
+    int noOfElements = 0;
+    for (State state : inMemoryState.values()) {
+      if (!(state instanceof CheckpointableIF)) {
+        throw new RuntimeException("State Implementations used by the " +
+            "Flink Dataflow Runner should implement the CheckpointableIF interface.");
+      }
+
+      if (((CheckpointableIF) state).shouldPersist()) {
+        noOfElements++;
+      }
+    }
+    return noOfElements;
+  }
+
+  private final class FlinkInMemoryValue<T> implements ValueState<T>, CheckpointableIF {
+
+    private final ByteString stateKey;
+    private final Coder<T> elemCoder;
+
+    private T value = null;
+
+    public FlinkInMemoryValue(ByteString stateKey, Coder<T> elemCoder) {
+      this.stateKey = stateKey;
+      this.elemCoder = elemCoder;
+    }
+
+    @Override
+    public void clear() {
+      value = null;
+    }
+
+    @Override
+    public void write(T input) {
+      this.value = input;
+    }
+
+    @Override
+    public T read() {
+      return value;
+    }
+
+    @Override
+    public ValueState<T> readLater() {
+      // Ignore
+      return this;
+    }
+
+    @Override
+    public boolean shouldPersist() {
+      return value != null;
+    }
+
+    @Override
+    public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
+      if (value != null) {
+        // serialize the coder.
+        byte[] coder = InstantiationUtil.serializeObject(elemCoder);
+
+        // encode the value into a ByteString
+        ByteString.Output stream = ByteString.newOutput();
+        elemCoder.encode(value, stream, Coder.Context.OUTER);
+        ByteString data = stream.toByteString();
+
+        checkpointBuilder.addValueBuilder()
+          .setTag(stateKey)
+          .setData(coder)
+          .setData(data);
+      }
+    }
+
+    public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
+      ByteString valueContent = checkpointReader.getData();
+      T outValue = elemCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
+      write(outValue);
+    }
+  }
+
+  private final class FlinkWatermarkHoldStateImpl<W extends BoundedWindow>
+      implements WatermarkHoldState<W>, CheckpointableIF {
+
+    private final ByteString stateKey;
+
+    private Instant minimumHold = null;
+
+    private OutputTimeFn<? super W> outputTimeFn;
+
+    public FlinkWatermarkHoldStateImpl(ByteString stateKey, OutputTimeFn<? super W> outputTimeFn) {
+      this.stateKey = stateKey;
+      this.outputTimeFn = outputTimeFn;
+    }
+
+    @Override
+    public void clear() {
+      // Even though we're clearing we can't remove this from the in-memory state map, since
+      // other users may already have a handle on this WatermarkBagInternal.
+      minimumHold = null;
+      watermarkHoldAccessor = null;
+    }
+
+    @Override
+    public void add(Instant watermarkHold) {
+      if (minimumHold == null || minimumHold.isAfter(watermarkHold)) {
+        watermarkHoldAccessor = watermarkHold;
+        minimumHold = watermarkHold;
+      }
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public Boolean read() {
+          return minimumHold == null;
+        }
+
+        @Override
+        public ReadableState<Boolean> readLater() {
+          // Ignore
+          return this;
+        }
+      };
+    }
+
+    @Override
+    public OutputTimeFn<? super W> getOutputTimeFn() {
+      return outputTimeFn;
+    }
+
+    @Override
+    public Instant read() {
+      return minimumHold;
+    }
+
+    @Override
+    public WatermarkHoldState<W> readLater() {
+      // Ignore
+      return this;
+    }
+
+    @Override
+    public String toString() {
+      return Objects.toString(minimumHold);
+    }
+
+    @Override
+    public boolean shouldPersist() {
+      return minimumHold != null;
+    }
+
+    @Override
+    public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
+      if (minimumHold != null) {
+        checkpointBuilder.addWatermarkHoldsBuilder()
+            .setTag(stateKey)
+            .setTimestamp(minimumHold);
+      }
+    }
+
+    public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
+      Instant watermark = checkpointReader.getTimestamp();
+      add(watermark);
+    }
+  }
+
+
+  private static <K, InputT, AccumT, OutputT> CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> withContext(
+      final Combine.KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
+    return new CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
+      @Override
+      public AccumT createAccumulator(K key, CombineWithContext.Context c) {
+        return combineFn.createAccumulator(key);
+      }
+
+      @Override
+      public AccumT addInput(K key, AccumT accumulator, InputT value, CombineWithContext.Context c) {
+        return combineFn.addInput(key, accumulator, value);
+      }
+
+      @Override
+      public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, CombineWithContext.Context c) {
+        return combineFn.mergeAccumulators(key, accumulators);
+      }
+
+      @Override
+      public OutputT extractOutput(K key, AccumT accumulator, CombineWithContext.Context c) {
+        return combineFn.extractOutput(key, accumulator);
+      }
+    };
+  }
+
+  private static <K, InputT, AccumT, OutputT> CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> withKeyAndContext(
+      final Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+    return new CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
+      @Override
+      public AccumT createAccumulator(K key, CombineWithContext.Context c) {
+        return combineFn.createAccumulator();
+      }
+
+      @Override
+      public AccumT addInput(K key, AccumT accumulator, InputT value, CombineWithContext.Context c) {
+        return combineFn.addInput(accumulator, value);
+      }
+
+      @Override
+      public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, CombineWithContext.Context c) {
+        return combineFn.mergeAccumulators(accumulators);
+      }
+
+      @Override
+      public OutputT extractOutput(K key, AccumT accumulator, CombineWithContext.Context c) {
+        return combineFn.extractOutput(accumulator);
+      }
+    };
+  }
+
+  private final class FlinkInMemoryKeyedCombiningValue<InputT, AccumT, OutputT>
+      implements AccumulatorCombiningState<InputT, AccumT, OutputT>, CheckpointableIF {
+
+    private final ByteString stateKey;
+    private final CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn;
+    private final Coder<AccumT> accumCoder;
+    private final CombineWithContext.Context context;
+
+    private AccumT accum = null;
+    private boolean isClear = true;
+
+    private FlinkInMemoryKeyedCombiningValue(ByteString stateKey,
+                                             Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
+                                             Coder<AccumT> accumCoder,
+                                             final StateContext<?> stateContext) {
+      this(stateKey, withKeyAndContext(combineFn), accumCoder, stateContext);
+    }
+
+
+    private FlinkInMemoryKeyedCombiningValue(ByteString stateKey,
+                                             Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
+                                             Coder<AccumT> accumCoder,
+                                             final StateContext<?> stateContext) {
+      this(stateKey, withContext(combineFn), accumCoder, stateContext);
+    }
+
+    private FlinkInMemoryKeyedCombiningValue(ByteString stateKey,
+                                             CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn,
+                                             Coder<AccumT> accumCoder,
+                                             final StateContext<?> stateContext) {
+      Preconditions.checkNotNull(combineFn);
+      Preconditions.checkNotNull(accumCoder);
+
+      this.stateKey = stateKey;
+      this.combineFn = combineFn;
+      this.accumCoder = accumCoder;
+      this.context = new CombineWithContext.Context() {
+        @Override
+        public PipelineOptions getPipelineOptions() {
+          return stateContext.getPipelineOptions();
+        }
+
+        @Override
+        public <T> T sideInput(PCollectionView<T> view) {
+          return stateContext.sideInput(view);
+        }
+      };
+      accum = combineFn.createAccumulator(key, context);
+    }
+
+    @Override
+    public void clear() {
+      accum = combineFn.createAccumulator(key, context);
+      isClear = true;
+    }
+
+    @Override
+    public void add(InputT input) {
+      isClear = false;
+      accum = combineFn.addInput(key, accum, input, context);
+    }
+
+    @Override
+    public AccumT getAccum() {
+      return accum;
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public ReadableState<Boolean> readLater() {
+          // Ignore
+          return this;
+        }
+
+        @Override
+        public Boolean read() {
+          return isClear;
+        }
+      };
+    }
+
+    @Override
+    public void addAccum(AccumT accum) {
+      isClear = false;
+      this.accum = combineFn.mergeAccumulators(key, Arrays.asList(this.accum, accum), context);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+      return combineFn.mergeAccumulators(key, accumulators, context);
+    }
+
+    @Override
+    public OutputT read() {
+      return combineFn.extractOutput(key, accum, context);
+    }
+
+    @Override
+    public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+      // Ignore
+      return this;
+    }
+
+    @Override
+    public boolean shouldPersist() {
+      return !isClear;
+    }
+
+    @Override
+    public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
+      if (!isClear) {
+        // serialize the coder.
+        byte[] coder = InstantiationUtil.serializeObject(accumCoder);
+
+        // serialize the combiner.
+        byte[] combiner = InstantiationUtil.serializeObject(combineFn);
+
+        // encode the accumulator into a ByteString
+        ByteString.Output stream = ByteString.newOutput();
+        accumCoder.encode(accum, stream, Coder.Context.OUTER);
+        ByteString data = stream.toByteString();
+
+        // put the flag that the next serialized element is an accumulator
+        checkpointBuilder.addAccumulatorBuilder()
+          .setTag(stateKey)
+          .setData(coder)
+          .setData(combiner)
+          .setData(data);
+      }
+    }
+
+    public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
+      ByteString valueContent = checkpointReader.getData();
+      AccumT accum = this.accumCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
+      addAccum(accum);
+    }
+  }
+
+  private static final class FlinkInMemoryBag<T> implements BagState<T>, CheckpointableIF {
+    private final List<T> contents = new ArrayList<>();
+
+    private final ByteString stateKey;
+    private final Coder<T> elemCoder;
+
+    public FlinkInMemoryBag(ByteString stateKey, Coder<T> elemCoder) {
+      this.stateKey = stateKey;
+      this.elemCoder = elemCoder;
+    }
+
+    @Override
+    public void clear() {
+      contents.clear();
+    }
+
+    @Override
+    public Iterable<T> read() {
+      return contents;
+    }
+
+    @Override
+    public BagState<T> readLater() {
+      // Ignore
+      return this;
+    }
+
+    @Override
+    public void add(T input) {
+      contents.add(input);
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public ReadableState<Boolean> readLater() {
+          // Ignore
+          return this;
+        }
+
+        @Override
+        public Boolean read() {
+          return contents.isEmpty();
+        }
+      };
+    }
+
+    @Override
+    public boolean shouldPersist() {
+      return !contents.isEmpty();
+    }
+
+    @Override
+    public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
+      if (!contents.isEmpty()) {
+        // serialize the coder.
+        byte[] coder = InstantiationUtil.serializeObject(elemCoder);
+
+        checkpointBuilder.addListUpdatesBuilder()
+            .setTag(stateKey)
+            .setData(coder)
+            .writeInt(contents.size());
+
+        for (T item : contents) {
+          // encode the element
+          ByteString.Output stream = ByteString.newOutput();
+          elemCoder.encode(item, stream, Coder.Context.OUTER);
+          ByteString data = stream.toByteString();
+
+          // add the data to the checkpoint.
+          checkpointBuilder.setData(data);
+        }
+      }
+    }
+
+    public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
+      int noOfValues = checkpointReader.getInt();
+      for (int j = 0; j < noOfValues; j++) {
+        ByteString valueContent = checkpointReader.getData();
+        T outValue = elemCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
+        add(outValue);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java
index ba8ef89..753309e 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java
@@ -25,65 +25,65 @@ import java.util.concurrent.TimeUnit;
 
 public class StateCheckpointReader {
 
-	private final DataInputView input;
-
-	public StateCheckpointReader(DataInputView in) {
-		this.input = in;
-	}
-
-	public ByteString getTag() throws IOException {
-		return ByteString.copyFrom(readRawData());
-	}
-
-	public String getTagToString() throws IOException {
-		return input.readUTF();
-	}
-
-	public ByteString getData() throws IOException {
-		return ByteString.copyFrom(readRawData());
-	}
-
-	public int getInt() throws IOException {
-		validate();
-		return input.readInt();
-	}
-
-	public byte getByte() throws IOException {
-		validate();
-		return input.readByte();
-	}
-
-	public Instant getTimestamp() throws IOException {
-		validate();
-		Long watermarkMillis = input.readLong();
-		return new Instant(TimeUnit.MICROSECONDS.toMillis(watermarkMillis));
-	}
-
-	public <K> K deserializeKey(CoderTypeSerializer<K> keySerializer) throws IOException {
-		return deserializeObject(keySerializer);
-	}
-
-	public <T> T deserializeObject(CoderTypeSerializer<T> objectSerializer) throws IOException {
-		return objectSerializer.deserialize(input);
-	}
-
-	/////////			Helper Methods			///////
-
-	private byte[] readRawData() throws IOException {
-		validate();
-		int size = input.readInt();
-
-		byte[] serData = new byte[size];
-		int bytesRead = input.read(serData);
-		if (bytesRead != size) {
-			throw new RuntimeException("Error while deserializing checkpoint. Not enough bytes in the input stream.");
-		}
-		return serData;
-	}
-
-	private void validate() {
-		if (this.input == null) {
-			throw new RuntimeException("StateBackend not initialized yet.");
-		}
-	}
+  private final DataInputView input;
+
+  public StateCheckpointReader(DataInputView in) {
+    this.input = in;
+  }
+
+  public ByteString getTag() throws IOException {
+    return ByteString.copyFrom(readRawData());
+  }
+
+  public String getTagToString() throws IOException {
+    return input.readUTF();
+  }
+
+  public ByteString getData() throws IOException {
+    return ByteString.copyFrom(readRawData());
+  }
+
+  public int getInt() throws IOException {
+    validate();
+    return input.readInt();
+  }
+
+  public byte getByte() throws IOException {
+    validate();
+    return input.readByte();
+  }
+
+  public Instant getTimestamp() throws IOException {
+    validate();
+    Long watermarkMillis = input.readLong();
+    return new Instant(TimeUnit.MICROSECONDS.toMillis(watermarkMillis));
+  }
+
+  public <K> K deserializeKey(CoderTypeSerializer<K> keySerializer) throws IOException {
+    return deserializeObject(keySerializer);
+  }
+
+  public <T> T deserializeObject(CoderTypeSerializer<T> objectSerializer) throws IOException {
+    return objectSerializer.deserialize(input);
+  }
+
+  /////////      Helper Methods      ///////
+
+  private byte[] readRawData() throws IOException {
+    validate();
+    int size = input.readInt();
+
+    byte[] serData = new byte[size];
+    int bytesRead = input.read(serData);
+    if (bytesRead != size) {
+      throw new RuntimeException("Error while deserializing checkpoint. Not enough bytes in the input stream.");
+    }
+    return serData;
+  }
+
+  private void validate() {
+    if (this.input == null) {
+      throw new RuntimeException("StateBackend not initialized yet.");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java
index cd85163..1741829 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java
@@ -34,120 +34,120 @@ import java.util.Set;
 
 public class StateCheckpointUtils {
 
-	public static <K> void encodeState(Map<K, FlinkStateInternals<K>> perKeyStateInternals,
-							 StateCheckpointWriter writer, Coder<K> keyCoder) throws IOException {
-		CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
-
-		int noOfKeys = perKeyStateInternals.size();
-		writer.writeInt(noOfKeys);
-		for (Map.Entry<K, FlinkStateInternals<K>> keyStatePair : perKeyStateInternals.entrySet()) {
-			K key = keyStatePair.getKey();
-			FlinkStateInternals<K> state = keyStatePair.getValue();
-
-			// encode the key
-			writer.serializeKey(key, keySerializer);
-
-			// write the associated state
-			state.persistState(writer);
-		}
-	}
-
-	public static <K> Map<K, FlinkStateInternals<K>> decodeState(
-			StateCheckpointReader reader,
-			OutputTimeFn<? super BoundedWindow> outputTimeFn,
-			Coder<K> keyCoder,
-			Coder<? extends BoundedWindow> windowCoder,
-			ClassLoader classLoader) throws IOException, ClassNotFoundException {
-
-		int noOfKeys = reader.getInt();
-		Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>(noOfKeys);
-		perKeyStateInternals.clear();
-
-		CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
-		for (int i = 0; i < noOfKeys; i++) {
-
-			// decode the key.
-			K key = reader.deserializeKey(keySerializer);
-
-			//decode the state associated to the key.
-			FlinkStateInternals<K> stateForKey =
-					new FlinkStateInternals<>(key, keyCoder, windowCoder, outputTimeFn);
-			stateForKey.restoreState(reader, classLoader);
-			perKeyStateInternals.put(key, stateForKey);
-		}
-		return perKeyStateInternals;
-	}
-
-	//////////////				Encoding/Decoding the Timers				////////////////
-
-
-	public static <K> void encodeTimers(Map<K, Set<TimerInternals.TimerData>> allTimers,
-							  StateCheckpointWriter writer,
-							  Coder<K> keyCoder) throws IOException {
-		CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
-
-		int noOfKeys = allTimers.size();
-		writer.writeInt(noOfKeys);
-		for (Map.Entry<K, Set<TimerInternals.TimerData>> timersPerKey : allTimers.entrySet()) {
-			K key = timersPerKey.getKey();
-
-			// encode the key
-			writer.serializeKey(key, keySerializer);
-
-			// write the associated timers
-			Set<TimerInternals.TimerData> timers = timersPerKey.getValue();
-			encodeTimerDataForKey(writer, timers);
-		}
-	}
-
-	public static <K> Map<K, Set<TimerInternals.TimerData>> decodeTimers(
-			StateCheckpointReader reader,
-			Coder<? extends BoundedWindow> windowCoder,
-			Coder<K> keyCoder) throws IOException {
-
-		int noOfKeys = reader.getInt();
-		Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(noOfKeys);
-		activeTimers.clear();
-
-		CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
-		for (int i = 0; i < noOfKeys; i++) {
-
-			// decode the key.
-			K key = reader.deserializeKey(keySerializer);
-
-			// decode the associated timers.
-			Set<TimerInternals.TimerData> timers = decodeTimerDataForKey(reader, windowCoder);
-			activeTimers.put(key, timers);
-		}
-		return activeTimers;
-	}
-
-	private static void encodeTimerDataForKey(StateCheckpointWriter writer, Set<TimerInternals.TimerData> timers) throws IOException {
-		// encode timers
-		writer.writeInt(timers.size());
-		for (TimerInternals.TimerData timer : timers) {
-			String stringKey = timer.getNamespace().stringKey();
-
-			writer.setTag(stringKey);
-			writer.setTimestamp(timer.getTimestamp());
-			writer.writeInt(timer.getDomain().ordinal());
-		}
-	}
-
-	private static Set<TimerInternals.TimerData> decodeTimerDataForKey(
-			StateCheckpointReader reader, Coder<? extends BoundedWindow> windowCoder) throws IOException {
-
-		// decode the timers: first their number and then the content itself.
-		int noOfTimers = reader.getInt();
-		Set<TimerInternals.TimerData> timers = new HashSet<>(noOfTimers);
-		for (int i = 0; i < noOfTimers; i++) {
-			String stringKey = reader.getTagToString();
-			Instant instant = reader.getTimestamp();
-			TimeDomain domain = TimeDomain.values()[reader.getInt()];
-
-			StateNamespace namespace = StateNamespaces.fromString(stringKey, windowCoder);
-			timers.add(TimerInternals.TimerData.of(namespace, instant, domain));
-		}
-		return timers;
-	}
+  public static <K> void encodeState(Map<K, FlinkStateInternals<K>> perKeyStateInternals,
+               StateCheckpointWriter writer, Coder<K> keyCoder) throws IOException {
+    CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
+
+    int noOfKeys = perKeyStateInternals.size();
+    writer.writeInt(noOfKeys);
+    for (Map.Entry<K, FlinkStateInternals<K>> keyStatePair : perKeyStateInternals.entrySet()) {
+      K key = keyStatePair.getKey();
+      FlinkStateInternals<K> state = keyStatePair.getValue();
+
+      // encode the key
+      writer.serializeKey(key, keySerializer);
+
+      // write the associated state
+      state.persistState(writer);
+    }
+  }
+
+  public static <K> Map<K, FlinkStateInternals<K>> decodeState(
+      StateCheckpointReader reader,
+      OutputTimeFn<? super BoundedWindow> outputTimeFn,
+      Coder<K> keyCoder,
+      Coder<? extends BoundedWindow> windowCoder,
+      ClassLoader classLoader) throws IOException, ClassNotFoundException {
+
+    int noOfKeys = reader.getInt();
+    Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>(noOfKeys);
+    perKeyStateInternals.clear();
+
+    CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
+    for (int i = 0; i < noOfKeys; i++) {
+
+      // decode the key.
+      K key = reader.deserializeKey(keySerializer);
+
+      //decode the state associated to the key.
+      FlinkStateInternals<K> stateForKey =
+          new FlinkStateInternals<>(key, keyCoder, windowCoder, outputTimeFn);
+      stateForKey.restoreState(reader, classLoader);
+      perKeyStateInternals.put(key, stateForKey);
+    }
+    return perKeyStateInternals;
+  }
+
+  //////////////        Encoding/Decoding the Timers        ////////////////
+
+
+  public static <K> void encodeTimers(Map<K, Set<TimerInternals.TimerData>> allTimers,
+                StateCheckpointWriter writer,
+                Coder<K> keyCoder) throws IOException {
+    CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
+
+    int noOfKeys = allTimers.size();
+    writer.writeInt(noOfKeys);
+    for (Map.Entry<K, Set<TimerInternals.TimerData>> timersPerKey : allTimers.entrySet()) {
+      K key = timersPerKey.getKey();
+
+      // encode the key
+      writer.serializeKey(key, keySerializer);
+
+      // write the associated timers
+      Set<TimerInternals.TimerData> timers = timersPerKey.getValue();
+      encodeTimerDataForKey(writer, timers);
+    }
+  }
+
+  public static <K> Map<K, Set<TimerInternals.TimerData>> decodeTimers(
+      StateCheckpointReader reader,
+      Coder<? extends BoundedWindow> windowCoder,
+      Coder<K> keyCoder) throws IOException {
+
+    int noOfKeys = reader.getInt();
+    Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(noOfKeys);
+    activeTimers.clear();
+
+    CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
+    for (int i = 0; i < noOfKeys; i++) {
+
+      // decode the key.
+      K key = reader.deserializeKey(keySerializer);
+
+      // decode the associated timers.
+      Set<TimerInternals.TimerData> timers = decodeTimerDataForKey(reader, windowCoder);
+      activeTimers.put(key, timers);
+    }
+    return activeTimers;
+  }
+
+  private static void encodeTimerDataForKey(StateCheckpointWriter writer, Set<TimerInternals.TimerData> timers) throws IOException {
+    // encode timers
+    writer.writeInt(timers.size());
+    for (TimerInternals.TimerData timer : timers) {
+      String stringKey = timer.getNamespace().stringKey();
+
+      writer.setTag(stringKey);
+      writer.setTimestamp(timer.getTimestamp());
+      writer.writeInt(timer.getDomain().ordinal());
+    }
+  }
+
+  private static Set<TimerInternals.TimerData> decodeTimerDataForKey(
+      StateCheckpointReader reader, Coder<? extends BoundedWindow> windowCoder) throws IOException {
+
+    // decode the timers: first their number and then the content itself.
+    int noOfTimers = reader.getInt();
+    Set<TimerInternals.TimerData> timers = new HashSet<>(noOfTimers);
+    for (int i = 0; i < noOfTimers; i++) {
+      String stringKey = reader.getTagToString();
+      Instant instant = reader.getTimestamp();
+      TimeDomain domain = TimeDomain.values()[reader.getInt()];
+
+      StateNamespace namespace = StateNamespaces.fromString(stringKey, windowCoder);
+      timers.add(TimerInternals.TimerData.of(namespace, instant, domain));
+    }
+    return timers;
+  }
 }