You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:37 UTC
[42/50] [abbrv] incubator-beam git commit: [flink] convert tabs to 2
spaces
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java
index dd14f68..0b1a5da 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java
@@ -41,191 +41,191 @@ import static com.google.common.base.Preconditions.checkArgument;
* */
public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> extends UnboundedSource<String, C> {
- private static final Coder<String> DEFAULT_SOCKET_CODER = StringUtf8Coder.of();
+ private static final Coder<String> DEFAULT_SOCKET_CODER = StringUtf8Coder.of();
- private static final long serialVersionUID = 1L;
-
- private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500;
-
- private static final int CONNECTION_TIMEOUT_TIME = 0;
-
- private final String hostname;
- private final int port;
- private final char delimiter;
- private final long maxNumRetries;
- private final long delayBetweenRetries;
-
- public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries) {
- this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP);
- }
-
- public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries, long delayBetweenRetries) {
- this.hostname = hostname;
- this.port = port;
- this.delimiter = delimiter;
- this.maxNumRetries = maxNumRetries;
- this.delayBetweenRetries = delayBetweenRetries;
- }
-
- public String getHostname() {
- return this.hostname;
- }
-
- public int getPort() {
- return this.port;
- }
-
- public char getDelimiter() {
- return this.delimiter;
- }
-
- public long getMaxNumRetries() {
- return this.maxNumRetries;
- }
-
- public long getDelayBetweenRetries() {
- return this.delayBetweenRetries;
- }
-
- @Override
- public List<? extends UnboundedSource<String, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
- return Collections.<UnboundedSource<String, C>>singletonList(this);
- }
-
- @Override
- public UnboundedReader<String> createReader(PipelineOptions options, @Nullable C checkpointMark) {
- return new UnboundedSocketReader(this);
- }
-
- @Nullable
- @Override
- public Coder getCheckpointMarkCoder() {
- // Flink and Dataflow have different checkpointing mechanisms.
- // In our case we do not need a coder.
- return null;
- }
-
- @Override
- public void validate() {
- checkArgument(port > 0 && port < 65536, "port is out of range");
- checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
- checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive");
- }
-
- @Override
- public Coder getDefaultOutputCoder() {
- return DEFAULT_SOCKET_CODER;
- }
-
- public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String> implements Serializable {
-
- private static final long serialVersionUID = 7526472295622776147L;
- private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class);
-
- private final UnboundedSocketSource source;
-
- private Socket socket;
- private BufferedReader reader;
-
- private boolean isRunning;
-
- private String currentRecord;
-
- public UnboundedSocketReader(UnboundedSocketSource source) {
- this.source = source;
- }
-
- private void openConnection() throws IOException {
- this.socket = new Socket();
- this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()), CONNECTION_TIMEOUT_TIME);
- this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
- this.isRunning = true;
- }
-
- @Override
- public boolean start() throws IOException {
- int attempt = 0;
- while (!isRunning) {
- try {
- openConnection();
- LOG.info("Connected to server socket " + this.source.getHostname() + ':' + this.source.getPort());
-
- return advance();
- } catch (IOException e) {
- LOG.info("Lost connection to server socket " + this.source.getHostname() + ':' + this.source.getPort() + ". Retrying in " + this.source.getDelayBetweenRetries() + " msecs...");
-
- if (this.source.getMaxNumRetries() == -1 || attempt++ < this.source.getMaxNumRetries()) {
- try {
- Thread.sleep(this.source.getDelayBetweenRetries());
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- } else {
- this.isRunning = false;
- break;
- }
- }
- }
- LOG.error("Unable to connect to host " + this.source.getHostname() + " : " + this.source.getPort());
- return false;
- }
-
- @Override
- public boolean advance() throws IOException {
- final StringBuilder buffer = new StringBuilder();
- int data;
- while (isRunning && (data = reader.read()) != -1) {
- // check if the string is complete
- if (data != this.source.getDelimiter()) {
- buffer.append((char) data);
- } else {
- if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\r') {
- buffer.setLength(buffer.length() - 1);
- }
- this.currentRecord = buffer.toString();
- buffer.setLength(0);
- return true;
- }
- }
- return false;
- }
-
- @Override
- public byte[] getCurrentRecordId() throws NoSuchElementException {
- return new byte[0];
- }
-
- @Override
- public String getCurrent() throws NoSuchElementException {
- return this.currentRecord;
- }
-
- @Override
- public Instant getCurrentTimestamp() throws NoSuchElementException {
- return Instant.now();
- }
-
- @Override
- public void close() throws IOException {
- this.reader.close();
- this.socket.close();
- this.isRunning = false;
- LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":" + this.source.getPort() + ".");
- }
-
- @Override
- public Instant getWatermark() {
- return Instant.now();
- }
-
- @Override
- public CheckpointMark getCheckpointMark() {
- return null;
- }
-
- @Override
- public UnboundedSource<String, ?> getCurrentSource() {
- return this.source;
- }
- }
+ private static final long serialVersionUID = 1L;
+
+ private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500;
+
+ private static final int CONNECTION_TIMEOUT_TIME = 0;
+
+ private final String hostname;
+ private final int port;
+ private final char delimiter;
+ private final long maxNumRetries;
+ private final long delayBetweenRetries;
+
+ public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries) {
+ this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP);
+ }
+
+ public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries, long delayBetweenRetries) {
+ this.hostname = hostname;
+ this.port = port;
+ this.delimiter = delimiter;
+ this.maxNumRetries = maxNumRetries;
+ this.delayBetweenRetries = delayBetweenRetries;
+ }
+
+ public String getHostname() {
+ return this.hostname;
+ }
+
+ public int getPort() {
+ return this.port;
+ }
+
+ public char getDelimiter() {
+ return this.delimiter;
+ }
+
+ public long getMaxNumRetries() {
+ return this.maxNumRetries;
+ }
+
+ public long getDelayBetweenRetries() {
+ return this.delayBetweenRetries;
+ }
+
+ @Override
+ public List<? extends UnboundedSource<String, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
+ return Collections.<UnboundedSource<String, C>>singletonList(this);
+ }
+
+ @Override
+ public UnboundedReader<String> createReader(PipelineOptions options, @Nullable C checkpointMark) {
+ return new UnboundedSocketReader(this);
+ }
+
+ @Nullable
+ @Override
+ public Coder getCheckpointMarkCoder() {
+ // Flink and Dataflow have different checkpointing mechanisms.
+ // In our case we do not need a coder.
+ return null;
+ }
+
+ @Override
+ public void validate() {
+ checkArgument(port > 0 && port < 65536, "port is out of range");
+ checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
+ checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive");
+ }
+
+ @Override
+ public Coder getDefaultOutputCoder() {
+ return DEFAULT_SOCKET_CODER;
+ }
+
+ public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String> implements Serializable {
+
+ private static final long serialVersionUID = 7526472295622776147L;
+ private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class);
+
+ private final UnboundedSocketSource source;
+
+ private Socket socket;
+ private BufferedReader reader;
+
+ private boolean isRunning;
+
+ private String currentRecord;
+
+ public UnboundedSocketReader(UnboundedSocketSource source) {
+ this.source = source;
+ }
+
+ private void openConnection() throws IOException {
+ this.socket = new Socket();
+ this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()), CONNECTION_TIMEOUT_TIME);
+ this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
+ this.isRunning = true;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ int attempt = 0;
+ while (!isRunning) {
+ try {
+ openConnection();
+ LOG.info("Connected to server socket " + this.source.getHostname() + ':' + this.source.getPort());
+
+ return advance();
+ } catch (IOException e) {
+ LOG.info("Lost connection to server socket " + this.source.getHostname() + ':' + this.source.getPort() + ". Retrying in " + this.source.getDelayBetweenRetries() + " msecs...");
+
+ if (this.source.getMaxNumRetries() == -1 || attempt++ < this.source.getMaxNumRetries()) {
+ try {
+ Thread.sleep(this.source.getDelayBetweenRetries());
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+ } else {
+ this.isRunning = false;
+ break;
+ }
+ }
+ }
+ LOG.error("Unable to connect to host " + this.source.getHostname() + " : " + this.source.getPort());
+ return false;
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ final StringBuilder buffer = new StringBuilder();
+ int data;
+ while (isRunning && (data = reader.read()) != -1) {
+ // check if the string is complete
+ if (data != this.source.getDelimiter()) {
+ buffer.append((char) data);
+ } else {
+ if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\r') {
+ buffer.setLength(buffer.length() - 1);
+ }
+ this.currentRecord = buffer.toString();
+ buffer.setLength(0);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public byte[] getCurrentRecordId() throws NoSuchElementException {
+ return new byte[0];
+ }
+
+ @Override
+ public String getCurrent() throws NoSuchElementException {
+ return this.currentRecord;
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return Instant.now();
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.reader.close();
+ this.socket.close();
+ this.isRunning = false;
+ LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":" + this.source.getPort() + ".");
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return Instant.now();
+ }
+
+ @Override
+ public CheckpointMark getCheckpointMark() {
+ return null;
+ }
+
+ @Override
+ public UnboundedSource<String, ?> getCurrentSource() {
+ return this.source;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index e065f87..5a89894 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -38,95 +38,95 @@ import org.joda.time.Instant;
* */
public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<T>> implements Triggerable {
- private final String name;
- private final UnboundedSource.UnboundedReader<T> reader;
-
- private StreamingRuntimeContext runtime = null;
- private StreamSource.ManualWatermarkContext<WindowedValue<T>> context = null;
-
- private volatile boolean isRunning = false;
-
- public UnboundedSourceWrapper(PipelineOptions options, Read.Unbounded<T> transform) {
- this.name = transform.getName();
- this.reader = transform.getSource().createReader(options, null);
- }
-
- public String getName() {
- return this.name;
- }
-
- WindowedValue<T> makeWindowedValue(T output, Instant timestamp) {
- if (timestamp == null) {
- timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
- }
- return WindowedValue.of(output, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
- }
-
- @Override
- public void run(SourceContext<WindowedValue<T>> ctx) throws Exception {
- if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
- throw new RuntimeException("We assume that all sources in Dataflow are EventTimeSourceFunction. " +
- "Apparently " + this.name + " is not. Probably you should consider writing your own Wrapper for this source.");
- }
-
- context = (StreamSource.ManualWatermarkContext<WindowedValue<T>>) ctx;
- runtime = (StreamingRuntimeContext) getRuntimeContext();
-
- this.isRunning = true;
- boolean inputAvailable = reader.start();
-
- setNextWatermarkTimer(this.runtime);
-
- while (isRunning) {
-
- while (!inputAvailable && isRunning) {
- // wait a bit until we retry to pull more records
- Thread.sleep(50);
- inputAvailable = reader.advance();
- }
-
- if (inputAvailable) {
-
- // get it and its timestamp from the source
- T item = reader.getCurrent();
- Instant timestamp = reader.getCurrentTimestamp();
-
- // write it to the output collector
- synchronized (ctx.getCheckpointLock()) {
- context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
- }
-
- inputAvailable = reader.advance();
- }
-
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-
- @Override
- public void trigger(long timestamp) throws Exception {
- if (this.isRunning) {
- synchronized (context.getCheckpointLock()) {
- long watermarkMillis = this.reader.getWatermark().getMillis();
- context.emitWatermark(new Watermark(watermarkMillis));
- }
- setNextWatermarkTimer(this.runtime);
- }
- }
-
- private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
- if (this.isRunning) {
- long watermarkInterval = runtime.getExecutionConfig().getAutoWatermarkInterval();
- long timeToNextWatermark = getTimeToNextWaternark(watermarkInterval);
- runtime.registerTimer(timeToNextWatermark, this);
- }
- }
-
- private long getTimeToNextWaternark(long watermarkInterval) {
- return System.currentTimeMillis() + watermarkInterval;
- }
+ private final String name;
+ private final UnboundedSource.UnboundedReader<T> reader;
+
+ private StreamingRuntimeContext runtime = null;
+ private StreamSource.ManualWatermarkContext<WindowedValue<T>> context = null;
+
+ private volatile boolean isRunning = false;
+
+ public UnboundedSourceWrapper(PipelineOptions options, Read.Unbounded<T> transform) {
+ this.name = transform.getName();
+ this.reader = transform.getSource().createReader(options, null);
+ }
+
+ public String getName() {
+ return this.name;
+ }
+
+ WindowedValue<T> makeWindowedValue(T output, Instant timestamp) {
+ if (timestamp == null) {
+ timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+ return WindowedValue.of(output, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+ }
+
+ @Override
+ public void run(SourceContext<WindowedValue<T>> ctx) throws Exception {
+ if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
+ throw new RuntimeException("We assume that all sources in Dataflow are EventTimeSourceFunction. " +
+ "Apparently " + this.name + " is not. Probably you should consider writing your own Wrapper for this source.");
+ }
+
+ context = (StreamSource.ManualWatermarkContext<WindowedValue<T>>) ctx;
+ runtime = (StreamingRuntimeContext) getRuntimeContext();
+
+ this.isRunning = true;
+ boolean inputAvailable = reader.start();
+
+ setNextWatermarkTimer(this.runtime);
+
+ while (isRunning) {
+
+ while (!inputAvailable && isRunning) {
+ // wait a bit until we retry to pull more records
+ Thread.sleep(50);
+ inputAvailable = reader.advance();
+ }
+
+ if (inputAvailable) {
+
+ // get it and its timestamp from the source
+ T item = reader.getCurrent();
+ Instant timestamp = reader.getCurrentTimestamp();
+
+ // write it to the output collector
+ synchronized (ctx.getCheckpointLock()) {
+ context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
+ }
+
+ inputAvailable = reader.advance();
+ }
+
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+ @Override
+ public void trigger(long timestamp) throws Exception {
+ if (this.isRunning) {
+ synchronized (context.getCheckpointLock()) {
+ long watermarkMillis = this.reader.getWatermark().getMillis();
+ context.emitWatermark(new Watermark(watermarkMillis));
+ }
+ setNextWatermarkTimer(this.runtime);
+ }
+ }
+
+ private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
+ if (this.isRunning) {
+ long watermarkInterval = runtime.getExecutionConfig().getAutoWatermarkInterval();
+ long timeToNextWatermark = getTimeToNextWaternark(watermarkInterval);
+ runtime.registerTimer(timeToNextWatermark, this);
+ }
+ }
+
+ private long getTimeToNextWaternark(long watermarkInterval) {
+ return System.currentTimeMillis() + watermarkInterval;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
index 84a322f..75c8ac6 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
@@ -34,93 +34,93 @@ import java.io.Serializable;
* The latter is used when snapshots of the current state are taken, for fault-tolerance.
* */
public abstract class AbstractFlinkTimerInternals<K, VIN> implements TimerInternals, Serializable {
- private Instant currentInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
- private Instant currentOutputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
- public void setCurrentInputWatermark(Instant watermark) {
- checkIfValidInputWatermark(watermark);
- this.currentInputWatermark = watermark;
- }
-
- public void setCurrentOutputWatermark(Instant watermark) {
- checkIfValidOutputWatermark(watermark);
- this.currentOutputWatermark = watermark;
- }
-
- private void setCurrentInputWatermarkAfterRecovery(Instant watermark) {
- if (!currentInputWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
- throw new RuntimeException("Explicitly setting the input watermark is only allowed on " +
- "initialization after recovery from a node failure. Apparently this is not " +
- "the case here as the watermark is already set.");
- }
- this.currentInputWatermark = watermark;
- }
-
- private void setCurrentOutputWatermarkAfterRecovery(Instant watermark) {
- if (!currentOutputWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
- throw new RuntimeException("Explicitly setting the output watermark is only allowed on " +
- "initialization after recovery from a node failure. Apparently this is not " +
- "the case here as the watermark is already set.");
- }
- this.currentOutputWatermark = watermark;
- }
-
- @Override
- public Instant currentProcessingTime() {
- return Instant.now();
- }
-
- @Override
- public Instant currentInputWatermarkTime() {
- return currentInputWatermark;
- }
-
- @Nullable
- @Override
- public Instant currentSynchronizedProcessingTime() {
- // TODO
- return null;
- }
-
- @Override
- public Instant currentOutputWatermarkTime() {
- return currentOutputWatermark;
- }
-
- private void checkIfValidInputWatermark(Instant newWatermark) {
- if (currentInputWatermark.isAfter(newWatermark)) {
- throw new IllegalArgumentException(String.format(
- "Cannot set current input watermark to %s. Newer watermarks " +
- "must be no earlier than the current one (%s).",
- newWatermark, currentInputWatermark));
- }
- }
-
- private void checkIfValidOutputWatermark(Instant newWatermark) {
- if (currentOutputWatermark.isAfter(newWatermark)) {
- throw new IllegalArgumentException(String.format(
- "Cannot set current output watermark to %s. Newer watermarks " +
- "must be no earlier than the current one (%s).",
- newWatermark, currentOutputWatermark));
- }
- }
-
- public void encodeTimerInternals(DoFn.ProcessContext context,
- StateCheckpointWriter writer,
- KvCoder<K, VIN> kvCoder,
- Coder<? extends BoundedWindow> windowCoder) throws IOException {
- if (context == null) {
- throw new RuntimeException("The Context has not been initialized.");
- }
-
- writer.setTimestamp(currentInputWatermark);
- writer.setTimestamp(currentOutputWatermark);
- }
-
- public void restoreTimerInternals(StateCheckpointReader reader,
- KvCoder<K, VIN> kvCoder,
- Coder<? extends BoundedWindow> windowCoder) throws IOException {
- setCurrentInputWatermarkAfterRecovery(reader.getTimestamp());
- setCurrentOutputWatermarkAfterRecovery(reader.getTimestamp());
- }
+ private Instant currentInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ private Instant currentOutputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+ public void setCurrentInputWatermark(Instant watermark) {
+ checkIfValidInputWatermark(watermark);
+ this.currentInputWatermark = watermark;
+ }
+
+ public void setCurrentOutputWatermark(Instant watermark) {
+ checkIfValidOutputWatermark(watermark);
+ this.currentOutputWatermark = watermark;
+ }
+
+ private void setCurrentInputWatermarkAfterRecovery(Instant watermark) {
+ if (!currentInputWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
+ throw new RuntimeException("Explicitly setting the input watermark is only allowed on " +
+ "initialization after recovery from a node failure. Apparently this is not " +
+ "the case here as the watermark is already set.");
+ }
+ this.currentInputWatermark = watermark;
+ }
+
+ private void setCurrentOutputWatermarkAfterRecovery(Instant watermark) {
+ if (!currentOutputWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
+ throw new RuntimeException("Explicitly setting the output watermark is only allowed on " +
+ "initialization after recovery from a node failure. Apparently this is not " +
+ "the case here as the watermark is already set.");
+ }
+ this.currentOutputWatermark = watermark;
+ }
+
+ @Override
+ public Instant currentProcessingTime() {
+ return Instant.now();
+ }
+
+ @Override
+ public Instant currentInputWatermarkTime() {
+ return currentInputWatermark;
+ }
+
+ @Nullable
+ @Override
+ public Instant currentSynchronizedProcessingTime() {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public Instant currentOutputWatermarkTime() {
+ return currentOutputWatermark;
+ }
+
+ private void checkIfValidInputWatermark(Instant newWatermark) {
+ if (currentInputWatermark.isAfter(newWatermark)) {
+ throw new IllegalArgumentException(String.format(
+ "Cannot set current input watermark to %s. Newer watermarks " +
+ "must be no earlier than the current one (%s).",
+ newWatermark, currentInputWatermark));
+ }
+ }
+
+ private void checkIfValidOutputWatermark(Instant newWatermark) {
+ if (currentOutputWatermark.isAfter(newWatermark)) {
+ throw new IllegalArgumentException(String.format(
+ "Cannot set current output watermark to %s. Newer watermarks " +
+ "must be no earlier than the current one (%s).",
+ newWatermark, currentOutputWatermark));
+ }
+ }
+
+ public void encodeTimerInternals(DoFn.ProcessContext context,
+ StateCheckpointWriter writer,
+ KvCoder<K, VIN> kvCoder,
+ Coder<? extends BoundedWindow> windowCoder) throws IOException {
+ if (context == null) {
+ throw new RuntimeException("The Context has not been initialized.");
+ }
+
+ writer.setTimestamp(currentInputWatermark);
+ writer.setTimestamp(currentOutputWatermark);
+ }
+
+ public void restoreTimerInternals(StateCheckpointReader reader,
+ KvCoder<K, VIN> kvCoder,
+ Coder<? extends BoundedWindow> windowCoder) throws IOException {
+ setCurrentInputWatermarkAfterRecovery(reader.getTimestamp());
+ setCurrentOutputWatermarkAfterRecovery(reader.getTimestamp());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java
index 41ab5f0..39fec14 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -41,673 +41,673 @@ import java.util.*;
*/
public class FlinkStateInternals<K> implements StateInternals<K> {
- private final K key;
-
- private final Coder<K> keyCoder;
-
- private final Coder<? extends BoundedWindow> windowCoder;
-
- private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
-
- private Instant watermarkHoldAccessor;
-
- public FlinkStateInternals(K key,
- Coder<K> keyCoder,
- Coder<? extends BoundedWindow> windowCoder,
- OutputTimeFn<? super BoundedWindow> outputTimeFn) {
- this.key = key;
- this.keyCoder = keyCoder;
- this.windowCoder = windowCoder;
- this.outputTimeFn = outputTimeFn;
- }
-
- public Instant getWatermarkHold() {
- return watermarkHoldAccessor;
- }
-
- /**
- * This is the interface state has to implement in order for it to be fault tolerant when
- * executed by the FlinkPipelineRunner.
- */
- private interface CheckpointableIF {
-
- boolean shouldPersist();
-
- void persistState(StateCheckpointWriter checkpointBuilder) throws IOException;
- }
-
- protected final StateTable<K> inMemoryState = new StateTable<K>() {
- @Override
- protected StateTag.StateBinder binderForNamespace(final StateNamespace namespace, final StateContext<?> c) {
- return new StateTag.StateBinder<K>() {
-
- @Override
- public <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
- return new FlinkInMemoryValue<>(encodeKey(namespace, address), coder);
- }
-
- @Override
- public <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
- return new FlinkInMemoryBag<>(encodeKey(namespace, address), elemCoder);
- }
-
- @Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
- return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c);
- }
-
- @Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c);
- }
-
- @Override
- public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
- StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
- return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c);
- }
-
- @Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(StateTag<? super K, WatermarkHoldState<W>> address, OutputTimeFn<? super W> outputTimeFn) {
- return new FlinkWatermarkHoldStateImpl<>(encodeKey(namespace, address), outputTimeFn);
- }
- };
- }
- };
-
- @Override
- public K getKey() {
- return key;
- }
-
- @Override
- public <StateT extends State> StateT state(StateNamespace namespace, StateTag<? super K, StateT> address) {
- return inMemoryState.get(namespace, address, null);
- }
-
- @Override
- public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c) {
- return inMemoryState.get(namespace, address, c);
- }
-
- public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
- checkpointBuilder.writeInt(getNoOfElements());
-
- for (State location : inMemoryState.values()) {
- if (!(location instanceof CheckpointableIF)) {
- throw new IllegalStateException(String.format(
- "%s wasn't created by %s -- unable to persist it",
- location.getClass().getSimpleName(),
- getClass().getSimpleName()));
- }
- ((CheckpointableIF) location).persistState(checkpointBuilder);
- }
- }
-
- public void restoreState(StateCheckpointReader checkpointReader, ClassLoader loader)
- throws IOException, ClassNotFoundException {
-
- // the number of elements to read.
- int noOfElements = checkpointReader.getInt();
- for (int i = 0; i < noOfElements; i++) {
- decodeState(checkpointReader, loader);
- }
- }
-
- /**
- * We remove the first character which encodes the type of the stateTag ('s' for system
- * and 'u' for user). For more details check out the source of
- * {@link StateTags.StateTagBase#getId()}.
- */
- private void decodeState(StateCheckpointReader reader, ClassLoader loader)
- throws IOException, ClassNotFoundException {
-
- StateType stateItemType = StateType.deserialize(reader);
- ByteString stateKey = reader.getTag();
-
- // first decode the namespace and the tagId...
- String[] namespaceAndTag = stateKey.toStringUtf8().split("\\+");
- if (namespaceAndTag.length != 2) {
- throw new IllegalArgumentException("Invalid stateKey " + stateKey.toString() + ".");
- }
- StateNamespace namespace = StateNamespaces.fromString(namespaceAndTag[0], windowCoder);
-
- // ... decide if it is a system or user stateTag...
- char ownerTag = namespaceAndTag[1].charAt(0);
- if (ownerTag != 's' && ownerTag != 'u') {
- throw new RuntimeException("Invalid StateTag name.");
- }
- boolean isSystemTag = ownerTag == 's';
- String tagId = namespaceAndTag[1].substring(1);
-
- // ...then decode the coder (if there is one)...
- Coder<?> coder = null;
- switch (stateItemType) {
- case VALUE:
- case LIST:
- case ACCUMULATOR:
- ByteString coderBytes = reader.getData();
- coder = InstantiationUtil.deserializeObject(coderBytes.toByteArray(), loader);
- break;
- case WATERMARK:
- break;
- }
-
- // ...then decode the combiner function (if there is one)...
- CombineWithContext.KeyedCombineFnWithContext<? super K, ?, ?, ?> combineFn = null;
- switch (stateItemType) {
- case ACCUMULATOR:
- ByteString combinerBytes = reader.getData();
- combineFn = InstantiationUtil.deserializeObject(combinerBytes.toByteArray(), loader);
- break;
- case VALUE:
- case LIST:
- case WATERMARK:
- break;
- }
-
- //... and finally, depending on the type of the state being decoded,
- // 1) create the adequate stateTag,
- // 2) create the state container,
- // 3) restore the actual content.
- switch (stateItemType) {
- case VALUE: {
- StateTag stateTag = StateTags.value(tagId, coder);
- stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
- @SuppressWarnings("unchecked")
- FlinkInMemoryValue<?> value = (FlinkInMemoryValue<?>) inMemoryState.get(namespace, stateTag, null);
- value.restoreState(reader);
- break;
- }
- case WATERMARK: {
- @SuppressWarnings("unchecked")
- StateTag<Object, WatermarkHoldState<BoundedWindow>> stateTag = StateTags.watermarkStateInternal(tagId, outputTimeFn);
- stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
- @SuppressWarnings("unchecked")
- FlinkWatermarkHoldStateImpl<?> watermark = (FlinkWatermarkHoldStateImpl<?>) inMemoryState.get(namespace, stateTag, null);
- watermark.restoreState(reader);
- break;
- }
- case LIST: {
- StateTag stateTag = StateTags.bag(tagId, coder);
- stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
- FlinkInMemoryBag<?> bag = (FlinkInMemoryBag<?>) inMemoryState.get(namespace, stateTag, null);
- bag.restoreState(reader);
- break;
- }
- case ACCUMULATOR: {
- @SuppressWarnings("unchecked")
- StateTag<K, AccumulatorCombiningState<?, ?, ?>> stateTag = StateTags.keyedCombiningValueWithContext(tagId, (Coder) coder, combineFn);
- stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
- @SuppressWarnings("unchecked")
- FlinkInMemoryKeyedCombiningValue<?, ?, ?> combiningValue =
- (FlinkInMemoryKeyedCombiningValue<?, ?, ?>) inMemoryState.get(namespace, stateTag, null);
- combiningValue.restoreState(reader);
- break;
- }
- default:
- throw new RuntimeException("Unknown State Type " + stateItemType + ".");
- }
- }
-
- private ByteString encodeKey(StateNamespace namespace, StateTag<? super K, ?> address) {
- StringBuilder sb = new StringBuilder();
- try {
- namespace.appendTo(sb);
- sb.append('+');
- address.appendTo(sb);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return ByteString.copyFromUtf8(sb.toString());
- }
-
- private int getNoOfElements() {
- int noOfElements = 0;
- for (State state : inMemoryState.values()) {
- if (!(state instanceof CheckpointableIF)) {
- throw new RuntimeException("State Implementations used by the " +
- "Flink Dataflow Runner should implement the CheckpointableIF interface.");
- }
-
- if (((CheckpointableIF) state).shouldPersist()) {
- noOfElements++;
- }
- }
- return noOfElements;
- }
-
- private final class FlinkInMemoryValue<T> implements ValueState<T>, CheckpointableIF {
-
- private final ByteString stateKey;
- private final Coder<T> elemCoder;
-
- private T value = null;
-
- public FlinkInMemoryValue(ByteString stateKey, Coder<T> elemCoder) {
- this.stateKey = stateKey;
- this.elemCoder = elemCoder;
- }
-
- @Override
- public void clear() {
- value = null;
- }
-
- @Override
- public void write(T input) {
- this.value = input;
- }
-
- @Override
- public T read() {
- return value;
- }
-
- @Override
- public ValueState<T> readLater() {
- // Ignore
- return this;
- }
-
- @Override
- public boolean shouldPersist() {
- return value != null;
- }
-
- @Override
- public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
- if (value != null) {
- // serialize the coder.
- byte[] coder = InstantiationUtil.serializeObject(elemCoder);
-
- // encode the value into a ByteString
- ByteString.Output stream = ByteString.newOutput();
- elemCoder.encode(value, stream, Coder.Context.OUTER);
- ByteString data = stream.toByteString();
-
- checkpointBuilder.addValueBuilder()
- .setTag(stateKey)
- .setData(coder)
- .setData(data);
- }
- }
-
- public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
- ByteString valueContent = checkpointReader.getData();
- T outValue = elemCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
- write(outValue);
- }
- }
-
- private final class FlinkWatermarkHoldStateImpl<W extends BoundedWindow>
- implements WatermarkHoldState<W>, CheckpointableIF {
-
- private final ByteString stateKey;
-
- private Instant minimumHold = null;
-
- private OutputTimeFn<? super W> outputTimeFn;
-
- public FlinkWatermarkHoldStateImpl(ByteString stateKey, OutputTimeFn<? super W> outputTimeFn) {
- this.stateKey = stateKey;
- this.outputTimeFn = outputTimeFn;
- }
-
- @Override
- public void clear() {
- // Even though we're clearing we can't remove this from the in-memory state map, since
- // other users may already have a handle on this WatermarkBagInternal.
- minimumHold = null;
- watermarkHoldAccessor = null;
- }
-
- @Override
- public void add(Instant watermarkHold) {
- if (minimumHold == null || minimumHold.isAfter(watermarkHold)) {
- watermarkHoldAccessor = watermarkHold;
- minimumHold = watermarkHold;
- }
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public Boolean read() {
- return minimumHold == null;
- }
-
- @Override
- public ReadableState<Boolean> readLater() {
- // Ignore
- return this;
- }
- };
- }
-
- @Override
- public OutputTimeFn<? super W> getOutputTimeFn() {
- return outputTimeFn;
- }
-
- @Override
- public Instant read() {
- return minimumHold;
- }
-
- @Override
- public WatermarkHoldState<W> readLater() {
- // Ignore
- return this;
- }
-
- @Override
- public String toString() {
- return Objects.toString(minimumHold);
- }
-
- @Override
- public boolean shouldPersist() {
- return minimumHold != null;
- }
-
- @Override
- public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
- if (minimumHold != null) {
- checkpointBuilder.addWatermarkHoldsBuilder()
- .setTag(stateKey)
- .setTimestamp(minimumHold);
- }
- }
-
- public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
- Instant watermark = checkpointReader.getTimestamp();
- add(watermark);
- }
- }
-
-
- private static <K, InputT, AccumT, OutputT> CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> withContext(
- final Combine.KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- return new CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
- @Override
- public AccumT createAccumulator(K key, CombineWithContext.Context c) {
- return combineFn.createAccumulator(key);
- }
-
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT value, CombineWithContext.Context c) {
- return combineFn.addInput(key, accumulator, value);
- }
-
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, CombineWithContext.Context c) {
- return combineFn.mergeAccumulators(key, accumulators);
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, CombineWithContext.Context c) {
- return combineFn.extractOutput(key, accumulator);
- }
- };
- }
-
- private static <K, InputT, AccumT, OutputT> CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> withKeyAndContext(
- final Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
- return new CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
- @Override
- public AccumT createAccumulator(K key, CombineWithContext.Context c) {
- return combineFn.createAccumulator();
- }
-
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT value, CombineWithContext.Context c) {
- return combineFn.addInput(accumulator, value);
- }
-
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, CombineWithContext.Context c) {
- return combineFn.mergeAccumulators(accumulators);
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, CombineWithContext.Context c) {
- return combineFn.extractOutput(accumulator);
- }
- };
- }
-
- private final class FlinkInMemoryKeyedCombiningValue<InputT, AccumT, OutputT>
- implements AccumulatorCombiningState<InputT, AccumT, OutputT>, CheckpointableIF {
-
- private final ByteString stateKey;
- private final CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn;
- private final Coder<AccumT> accumCoder;
- private final CombineWithContext.Context context;
-
- private AccumT accum = null;
- private boolean isClear = true;
-
- private FlinkInMemoryKeyedCombiningValue(ByteString stateKey,
- Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
- Coder<AccumT> accumCoder,
- final StateContext<?> stateContext) {
- this(stateKey, withKeyAndContext(combineFn), accumCoder, stateContext);
- }
-
-
- private FlinkInMemoryKeyedCombiningValue(ByteString stateKey,
- Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
- Coder<AccumT> accumCoder,
- final StateContext<?> stateContext) {
- this(stateKey, withContext(combineFn), accumCoder, stateContext);
- }
-
- private FlinkInMemoryKeyedCombiningValue(ByteString stateKey,
- CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn,
- Coder<AccumT> accumCoder,
- final StateContext<?> stateContext) {
- Preconditions.checkNotNull(combineFn);
- Preconditions.checkNotNull(accumCoder);
-
- this.stateKey = stateKey;
- this.combineFn = combineFn;
- this.accumCoder = accumCoder;
- this.context = new CombineWithContext.Context() {
- @Override
- public PipelineOptions getPipelineOptions() {
- return stateContext.getPipelineOptions();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- return stateContext.sideInput(view);
- }
- };
- accum = combineFn.createAccumulator(key, context);
- }
-
- @Override
- public void clear() {
- accum = combineFn.createAccumulator(key, context);
- isClear = true;
- }
-
- @Override
- public void add(InputT input) {
- isClear = false;
- accum = combineFn.addInput(key, accum, input, context);
- }
-
- @Override
- public AccumT getAccum() {
- return accum;
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public ReadableState<Boolean> readLater() {
- // Ignore
- return this;
- }
-
- @Override
- public Boolean read() {
- return isClear;
- }
- };
- }
-
- @Override
- public void addAccum(AccumT accum) {
- isClear = false;
- this.accum = combineFn.mergeAccumulators(key, Arrays.asList(this.accum, accum), context);
- }
-
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(key, accumulators, context);
- }
-
- @Override
- public OutputT read() {
- return combineFn.extractOutput(key, accum, context);
- }
-
- @Override
- public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
- // Ignore
- return this;
- }
-
- @Override
- public boolean shouldPersist() {
- return !isClear;
- }
-
- @Override
- public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
- if (!isClear) {
- // serialize the coder.
- byte[] coder = InstantiationUtil.serializeObject(accumCoder);
-
- // serialize the combiner.
- byte[] combiner = InstantiationUtil.serializeObject(combineFn);
-
- // encode the accumulator into a ByteString
- ByteString.Output stream = ByteString.newOutput();
- accumCoder.encode(accum, stream, Coder.Context.OUTER);
- ByteString data = stream.toByteString();
-
- // put the flag that the next serialized element is an accumulator
- checkpointBuilder.addAccumulatorBuilder()
- .setTag(stateKey)
- .setData(coder)
- .setData(combiner)
- .setData(data);
- }
- }
-
- public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
- ByteString valueContent = checkpointReader.getData();
- AccumT accum = this.accumCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
- addAccum(accum);
- }
- }
-
- private static final class FlinkInMemoryBag<T> implements BagState<T>, CheckpointableIF {
- private final List<T> contents = new ArrayList<>();
-
- private final ByteString stateKey;
- private final Coder<T> elemCoder;
-
- public FlinkInMemoryBag(ByteString stateKey, Coder<T> elemCoder) {
- this.stateKey = stateKey;
- this.elemCoder = elemCoder;
- }
-
- @Override
- public void clear() {
- contents.clear();
- }
-
- @Override
- public Iterable<T> read() {
- return contents;
- }
-
- @Override
- public BagState<T> readLater() {
- // Ignore
- return this;
- }
-
- @Override
- public void add(T input) {
- contents.add(input);
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public ReadableState<Boolean> readLater() {
- // Ignore
- return this;
- }
-
- @Override
- public Boolean read() {
- return contents.isEmpty();
- }
- };
- }
-
- @Override
- public boolean shouldPersist() {
- return !contents.isEmpty();
- }
-
- @Override
- public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
- if (!contents.isEmpty()) {
- // serialize the coder.
- byte[] coder = InstantiationUtil.serializeObject(elemCoder);
-
- checkpointBuilder.addListUpdatesBuilder()
- .setTag(stateKey)
- .setData(coder)
- .writeInt(contents.size());
-
- for (T item : contents) {
- // encode the element
- ByteString.Output stream = ByteString.newOutput();
- elemCoder.encode(item, stream, Coder.Context.OUTER);
- ByteString data = stream.toByteString();
-
- // add the data to the checkpoint.
- checkpointBuilder.setData(data);
- }
- }
- }
-
- public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
- int noOfValues = checkpointReader.getInt();
- for (int j = 0; j < noOfValues; j++) {
- ByteString valueContent = checkpointReader.getData();
- T outValue = elemCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
- add(outValue);
- }
- }
- }
+ private final K key;
+
+ private final Coder<K> keyCoder;
+
+ private final Coder<? extends BoundedWindow> windowCoder;
+
+ private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
+
+ private Instant watermarkHoldAccessor;
+
+ public FlinkStateInternals(K key,
+ Coder<K> keyCoder,
+ Coder<? extends BoundedWindow> windowCoder,
+ OutputTimeFn<? super BoundedWindow> outputTimeFn) {
+ this.key = key;
+ this.keyCoder = keyCoder;
+ this.windowCoder = windowCoder;
+ this.outputTimeFn = outputTimeFn;
+ }
+
+ public Instant getWatermarkHold() {
+ return watermarkHoldAccessor;
+ }
+
+ /**
+ * This is the interface state has to implement in order for it to be fault tolerant when
+ * executed by the FlinkPipelineRunner.
+ */
+ private interface CheckpointableIF {
+
+ boolean shouldPersist();
+
+ void persistState(StateCheckpointWriter checkpointBuilder) throws IOException;
+ }
+
+ protected final StateTable<K> inMemoryState = new StateTable<K>() {
+ @Override
+ protected StateTag.StateBinder binderForNamespace(final StateNamespace namespace, final StateContext<?> c) {
+ return new StateTag.StateBinder<K>() {
+
+ @Override
+ public <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+ return new FlinkInMemoryValue<>(encodeKey(namespace, address), coder);
+ }
+
+ @Override
+ public <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+ return new FlinkInMemoryBag<>(encodeKey(namespace, address), elemCoder);
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(
+ StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+ return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c);
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+ StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> accumCoder,
+ Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
+ return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c);
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
+ StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+ Coder<AccumT> accumCoder,
+ CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
+ return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c);
+ }
+
+ @Override
+ public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(StateTag<? super K, WatermarkHoldState<W>> address, OutputTimeFn<? super W> outputTimeFn) {
+ return new FlinkWatermarkHoldStateImpl<>(encodeKey(namespace, address), outputTimeFn);
+ }
+ };
+ }
+ };
+
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ @Override
+ public <StateT extends State> StateT state(StateNamespace namespace, StateTag<? super K, StateT> address) {
+ return inMemoryState.get(namespace, address, null);
+ }
+
+ @Override
+ public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c) {
+ return inMemoryState.get(namespace, address, c);
+ }
+
+ public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
+ checkpointBuilder.writeInt(getNoOfElements());
+
+ for (State location : inMemoryState.values()) {
+ if (!(location instanceof CheckpointableIF)) {
+ throw new IllegalStateException(String.format(
+ "%s wasn't created by %s -- unable to persist it",
+ location.getClass().getSimpleName(),
+ getClass().getSimpleName()));
+ }
+ ((CheckpointableIF) location).persistState(checkpointBuilder);
+ }
+ }
+
+ public void restoreState(StateCheckpointReader checkpointReader, ClassLoader loader)
+ throws IOException, ClassNotFoundException {
+
+ // the number of elements to read.
+ int noOfElements = checkpointReader.getInt();
+ for (int i = 0; i < noOfElements; i++) {
+ decodeState(checkpointReader, loader);
+ }
+ }
+
+ /**
+ * We remove the first character which encodes the type of the stateTag ('s' for system
+ * and 'u' for user). For more details check out the source of
+ * {@link StateTags.StateTagBase#getId()}.
+ */
+ private void decodeState(StateCheckpointReader reader, ClassLoader loader)
+ throws IOException, ClassNotFoundException {
+
+ StateType stateItemType = StateType.deserialize(reader);
+ ByteString stateKey = reader.getTag();
+
+ // first decode the namespace and the tagId...
+ String[] namespaceAndTag = stateKey.toStringUtf8().split("\\+");
+ if (namespaceAndTag.length != 2) {
+ throw new IllegalArgumentException("Invalid stateKey " + stateKey.toString() + ".");
+ }
+ StateNamespace namespace = StateNamespaces.fromString(namespaceAndTag[0], windowCoder);
+
+ // ... decide if it is a system or user stateTag...
+ char ownerTag = namespaceAndTag[1].charAt(0);
+ if (ownerTag != 's' && ownerTag != 'u') {
+ throw new RuntimeException("Invalid StateTag name.");
+ }
+ boolean isSystemTag = ownerTag == 's';
+ String tagId = namespaceAndTag[1].substring(1);
+
+ // ...then decode the coder (if there is one)...
+ Coder<?> coder = null;
+ switch (stateItemType) {
+ case VALUE:
+ case LIST:
+ case ACCUMULATOR:
+ ByteString coderBytes = reader.getData();
+ coder = InstantiationUtil.deserializeObject(coderBytes.toByteArray(), loader);
+ break;
+ case WATERMARK:
+ break;
+ }
+
+ // ...then decode the combiner function (if there is one)...
+ CombineWithContext.KeyedCombineFnWithContext<? super K, ?, ?, ?> combineFn = null;
+ switch (stateItemType) {
+ case ACCUMULATOR:
+ ByteString combinerBytes = reader.getData();
+ combineFn = InstantiationUtil.deserializeObject(combinerBytes.toByteArray(), loader);
+ break;
+ case VALUE:
+ case LIST:
+ case WATERMARK:
+ break;
+ }
+
+ //... and finally, depending on the type of the state being decoded,
+ // 1) create the adequate stateTag,
+ // 2) create the state container,
+ // 3) restore the actual content.
+ switch (stateItemType) {
+ case VALUE: {
+ StateTag stateTag = StateTags.value(tagId, coder);
+ stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
+ @SuppressWarnings("unchecked")
+ FlinkInMemoryValue<?> value = (FlinkInMemoryValue<?>) inMemoryState.get(namespace, stateTag, null);
+ value.restoreState(reader);
+ break;
+ }
+ case WATERMARK: {
+ @SuppressWarnings("unchecked")
+ StateTag<Object, WatermarkHoldState<BoundedWindow>> stateTag = StateTags.watermarkStateInternal(tagId, outputTimeFn);
+ stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
+ @SuppressWarnings("unchecked")
+ FlinkWatermarkHoldStateImpl<?> watermark = (FlinkWatermarkHoldStateImpl<?>) inMemoryState.get(namespace, stateTag, null);
+ watermark.restoreState(reader);
+ break;
+ }
+ case LIST: {
+ StateTag stateTag = StateTags.bag(tagId, coder);
+ stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
+ FlinkInMemoryBag<?> bag = (FlinkInMemoryBag<?>) inMemoryState.get(namespace, stateTag, null);
+ bag.restoreState(reader);
+ break;
+ }
+ case ACCUMULATOR: {
+ @SuppressWarnings("unchecked")
+ StateTag<K, AccumulatorCombiningState<?, ?, ?>> stateTag = StateTags.keyedCombiningValueWithContext(tagId, (Coder) coder, combineFn);
+ stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag;
+ @SuppressWarnings("unchecked")
+ FlinkInMemoryKeyedCombiningValue<?, ?, ?> combiningValue =
+ (FlinkInMemoryKeyedCombiningValue<?, ?, ?>) inMemoryState.get(namespace, stateTag, null);
+ combiningValue.restoreState(reader);
+ break;
+ }
+ default:
+ throw new RuntimeException("Unknown State Type " + stateItemType + ".");
+ }
+ }
+
+ private ByteString encodeKey(StateNamespace namespace, StateTag<? super K, ?> address) {
+ StringBuilder sb = new StringBuilder();
+ try {
+ namespace.appendTo(sb);
+ sb.append('+');
+ address.appendTo(sb);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return ByteString.copyFromUtf8(sb.toString());
+ }
+
+ private int getNoOfElements() {
+ int noOfElements = 0;
+ for (State state : inMemoryState.values()) {
+ if (!(state instanceof CheckpointableIF)) {
+ throw new RuntimeException("State Implementations used by the " +
+ "Flink Dataflow Runner should implement the CheckpointableIF interface.");
+ }
+
+ if (((CheckpointableIF) state).shouldPersist()) {
+ noOfElements++;
+ }
+ }
+ return noOfElements;
+ }
+
+ private final class FlinkInMemoryValue<T> implements ValueState<T>, CheckpointableIF {
+
+ private final ByteString stateKey;
+ private final Coder<T> elemCoder;
+
+ private T value = null;
+
+ public FlinkInMemoryValue(ByteString stateKey, Coder<T> elemCoder) {
+ this.stateKey = stateKey;
+ this.elemCoder = elemCoder;
+ }
+
+ @Override
+ public void clear() {
+ value = null;
+ }
+
+ @Override
+ public void write(T input) {
+ this.value = input;
+ }
+
+ @Override
+ public T read() {
+ return value;
+ }
+
+ @Override
+ public ValueState<T> readLater() {
+ // Ignore
+ return this;
+ }
+
+ @Override
+ public boolean shouldPersist() {
+ return value != null;
+ }
+
+ @Override
+ public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
+ if (value != null) {
+ // serialize the coder.
+ byte[] coder = InstantiationUtil.serializeObject(elemCoder);
+
+ // encode the value into a ByteString
+ ByteString.Output stream = ByteString.newOutput();
+ elemCoder.encode(value, stream, Coder.Context.OUTER);
+ ByteString data = stream.toByteString();
+
+ checkpointBuilder.addValueBuilder()
+ .setTag(stateKey)
+ .setData(coder)
+ .setData(data);
+ }
+ }
+
+ public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
+ ByteString valueContent = checkpointReader.getData();
+ T outValue = elemCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
+ write(outValue);
+ }
+ }
+
+ private final class FlinkWatermarkHoldStateImpl<W extends BoundedWindow>
+ implements WatermarkHoldState<W>, CheckpointableIF {
+
+ private final ByteString stateKey;
+
+ private Instant minimumHold = null;
+
+ private OutputTimeFn<? super W> outputTimeFn;
+
+ public FlinkWatermarkHoldStateImpl(ByteString stateKey, OutputTimeFn<? super W> outputTimeFn) {
+ this.stateKey = stateKey;
+ this.outputTimeFn = outputTimeFn;
+ }
+
+ @Override
+ public void clear() {
+ // Even though we're clearing we can't remove this from the in-memory state map, since
+ // other users may already have a handle on this WatermarkBagInternal.
+ minimumHold = null;
+ watermarkHoldAccessor = null;
+ }
+
+ @Override
+ public void add(Instant watermarkHold) {
+ if (minimumHold == null || minimumHold.isAfter(watermarkHold)) {
+ watermarkHoldAccessor = watermarkHold;
+ minimumHold = watermarkHold;
+ }
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return new ReadableState<Boolean>() {
+ @Override
+ public Boolean read() {
+ return minimumHold == null;
+ }
+
+ @Override
+ public ReadableState<Boolean> readLater() {
+ // Ignore
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public OutputTimeFn<? super W> getOutputTimeFn() {
+ return outputTimeFn;
+ }
+
+ @Override
+ public Instant read() {
+ return minimumHold;
+ }
+
+ @Override
+ public WatermarkHoldState<W> readLater() {
+ // Ignore
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toString(minimumHold);
+ }
+
+ @Override
+ public boolean shouldPersist() {
+ return minimumHold != null;
+ }
+
+ @Override
+ public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
+ if (minimumHold != null) {
+ checkpointBuilder.addWatermarkHoldsBuilder()
+ .setTag(stateKey)
+ .setTimestamp(minimumHold);
+ }
+ }
+
+ public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
+ Instant watermark = checkpointReader.getTimestamp();
+ add(watermark);
+ }
+ }
+
+
+ private static <K, InputT, AccumT, OutputT> CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> withContext(
+ final Combine.KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
+ return new CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
+ @Override
+ public AccumT createAccumulator(K key, CombineWithContext.Context c) {
+ return combineFn.createAccumulator(key);
+ }
+
+ @Override
+ public AccumT addInput(K key, AccumT accumulator, InputT value, CombineWithContext.Context c) {
+ return combineFn.addInput(key, accumulator, value);
+ }
+
+ @Override
+ public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, CombineWithContext.Context c) {
+ return combineFn.mergeAccumulators(key, accumulators);
+ }
+
+ @Override
+ public OutputT extractOutput(K key, AccumT accumulator, CombineWithContext.Context c) {
+ return combineFn.extractOutput(key, accumulator);
+ }
+ };
+ }
+
+ private static <K, InputT, AccumT, OutputT> CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> withKeyAndContext(
+ final Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+ return new CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() {
+ @Override
+ public AccumT createAccumulator(K key, CombineWithContext.Context c) {
+ return combineFn.createAccumulator();
+ }
+
+ @Override
+ public AccumT addInput(K key, AccumT accumulator, InputT value, CombineWithContext.Context c) {
+ return combineFn.addInput(accumulator, value);
+ }
+
+ @Override
+ public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, CombineWithContext.Context c) {
+ return combineFn.mergeAccumulators(accumulators);
+ }
+
+ @Override
+ public OutputT extractOutput(K key, AccumT accumulator, CombineWithContext.Context c) {
+ return combineFn.extractOutput(accumulator);
+ }
+ };
+ }
+
+ private final class FlinkInMemoryKeyedCombiningValue<InputT, AccumT, OutputT>
+ implements AccumulatorCombiningState<InputT, AccumT, OutputT>, CheckpointableIF {
+
+ private final ByteString stateKey;
+ private final CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn;
+ private final Coder<AccumT> accumCoder;
+ private final CombineWithContext.Context context;
+
+ private AccumT accum = null;
+ private boolean isClear = true;
+
+ private FlinkInMemoryKeyedCombiningValue(ByteString stateKey,
+ Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
+ Coder<AccumT> accumCoder,
+ final StateContext<?> stateContext) {
+ this(stateKey, withKeyAndContext(combineFn), accumCoder, stateContext);
+ }
+
+
+ private FlinkInMemoryKeyedCombiningValue(ByteString stateKey,
+ Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
+ Coder<AccumT> accumCoder,
+ final StateContext<?> stateContext) {
+ this(stateKey, withContext(combineFn), accumCoder, stateContext);
+ }
+
+ private FlinkInMemoryKeyedCombiningValue(ByteString stateKey,
+ CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn,
+ Coder<AccumT> accumCoder,
+ final StateContext<?> stateContext) {
+ Preconditions.checkNotNull(combineFn);
+ Preconditions.checkNotNull(accumCoder);
+
+ this.stateKey = stateKey;
+ this.combineFn = combineFn;
+ this.accumCoder = accumCoder;
+ this.context = new CombineWithContext.Context() {
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return stateContext.getPipelineOptions();
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ return stateContext.sideInput(view);
+ }
+ };
+ accum = combineFn.createAccumulator(key, context);
+ }
+
+ @Override
+ public void clear() {
+ accum = combineFn.createAccumulator(key, context);
+ isClear = true;
+ }
+
+ @Override
+ public void add(InputT input) {
+ isClear = false;
+ accum = combineFn.addInput(key, accum, input, context);
+ }
+
+ @Override
+ public AccumT getAccum() {
+ return accum;
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return new ReadableState<Boolean>() {
+ @Override
+ public ReadableState<Boolean> readLater() {
+ // Ignore
+ return this;
+ }
+
+ @Override
+ public Boolean read() {
+ return isClear;
+ }
+ };
+ }
+
+ @Override
+ public void addAccum(AccumT accum) {
+ isClear = false;
+ this.accum = combineFn.mergeAccumulators(key, Arrays.asList(this.accum, accum), context);
+ }
+
+ @Override
+ public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+ return combineFn.mergeAccumulators(key, accumulators, context);
+ }
+
+ @Override
+ public OutputT read() {
+ return combineFn.extractOutput(key, accum, context);
+ }
+
+ @Override
+ public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+ // Ignore
+ return this;
+ }
+
+ @Override
+ public boolean shouldPersist() {
+ return !isClear;
+ }
+
+ @Override
+ public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
+ if (!isClear) {
+ // serialize the coder.
+ byte[] coder = InstantiationUtil.serializeObject(accumCoder);
+
+ // serialize the combiner.
+ byte[] combiner = InstantiationUtil.serializeObject(combineFn);
+
+ // encode the accumulator into a ByteString
+ ByteString.Output stream = ByteString.newOutput();
+ accumCoder.encode(accum, stream, Coder.Context.OUTER);
+ ByteString data = stream.toByteString();
+
+ // put the flag that the next serialized element is an accumulator
+ checkpointBuilder.addAccumulatorBuilder()
+ .setTag(stateKey)
+ .setData(coder)
+ .setData(combiner)
+ .setData(data);
+ }
+ }
+
+ public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
+ ByteString valueContent = checkpointReader.getData();
+ AccumT accum = this.accumCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
+ addAccum(accum);
+ }
+ }
+
+ private static final class FlinkInMemoryBag<T> implements BagState<T>, CheckpointableIF {
+ private final List<T> contents = new ArrayList<>();
+
+ private final ByteString stateKey;
+ private final Coder<T> elemCoder;
+
+ public FlinkInMemoryBag(ByteString stateKey, Coder<T> elemCoder) {
+ this.stateKey = stateKey;
+ this.elemCoder = elemCoder;
+ }
+
+ @Override
+ public void clear() {
+ contents.clear();
+ }
+
+ @Override
+ public Iterable<T> read() {
+ return contents;
+ }
+
+ @Override
+ public BagState<T> readLater() {
+ // Ignore
+ return this;
+ }
+
+ @Override
+ public void add(T input) {
+ contents.add(input);
+ }
+
+ @Override
+ public ReadableState<Boolean> isEmpty() {
+ return new ReadableState<Boolean>() {
+ @Override
+ public ReadableState<Boolean> readLater() {
+ // Ignore
+ return this;
+ }
+
+ @Override
+ public Boolean read() {
+ return contents.isEmpty();
+ }
+ };
+ }
+
+ @Override
+ public boolean shouldPersist() {
+ return !contents.isEmpty();
+ }
+
+ @Override
+ public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException {
+ if (!contents.isEmpty()) {
+ // serialize the coder.
+ byte[] coder = InstantiationUtil.serializeObject(elemCoder);
+
+ checkpointBuilder.addListUpdatesBuilder()
+ .setTag(stateKey)
+ .setData(coder)
+ .writeInt(contents.size());
+
+ for (T item : contents) {
+ // encode the element
+ ByteString.Output stream = ByteString.newOutput();
+ elemCoder.encode(item, stream, Coder.Context.OUTER);
+ ByteString data = stream.toByteString();
+
+ // add the data to the checkpoint.
+ checkpointBuilder.setData(data);
+ }
+ }
+ }
+
+ public void restoreState(StateCheckpointReader checkpointReader) throws IOException {
+ int noOfValues = checkpointReader.getInt();
+ for (int j = 0; j < noOfValues; j++) {
+ ByteString valueContent = checkpointReader.getData();
+ T outValue = elemCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
+ add(outValue);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java
index ba8ef89..753309e 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java
@@ -25,65 +25,65 @@ import java.util.concurrent.TimeUnit;
public class StateCheckpointReader {
- private final DataInputView input;
-
- public StateCheckpointReader(DataInputView in) {
- this.input = in;
- }
-
- public ByteString getTag() throws IOException {
- return ByteString.copyFrom(readRawData());
- }
-
- public String getTagToString() throws IOException {
- return input.readUTF();
- }
-
- public ByteString getData() throws IOException {
- return ByteString.copyFrom(readRawData());
- }
-
- public int getInt() throws IOException {
- validate();
- return input.readInt();
- }
-
- public byte getByte() throws IOException {
- validate();
- return input.readByte();
- }
-
- public Instant getTimestamp() throws IOException {
- validate();
- Long watermarkMillis = input.readLong();
- return new Instant(TimeUnit.MICROSECONDS.toMillis(watermarkMillis));
- }
-
- public <K> K deserializeKey(CoderTypeSerializer<K> keySerializer) throws IOException {
- return deserializeObject(keySerializer);
- }
-
- public <T> T deserializeObject(CoderTypeSerializer<T> objectSerializer) throws IOException {
- return objectSerializer.deserialize(input);
- }
-
- ///////// Helper Methods ///////
-
- private byte[] readRawData() throws IOException {
- validate();
- int size = input.readInt();
-
- byte[] serData = new byte[size];
- int bytesRead = input.read(serData);
- if (bytesRead != size) {
- throw new RuntimeException("Error while deserializing checkpoint. Not enough bytes in the input stream.");
- }
- return serData;
- }
-
- private void validate() {
- if (this.input == null) {
- throw new RuntimeException("StateBackend not initialized yet.");
- }
- }
+ private final DataInputView input;
+
+ public StateCheckpointReader(DataInputView in) {
+ this.input = in;
+ }
+
+ public ByteString getTag() throws IOException {
+ return ByteString.copyFrom(readRawData());
+ }
+
+ public String getTagToString() throws IOException {
+ return input.readUTF();
+ }
+
+ public ByteString getData() throws IOException {
+ return ByteString.copyFrom(readRawData());
+ }
+
+ public int getInt() throws IOException {
+ validate();
+ return input.readInt();
+ }
+
+ public byte getByte() throws IOException {
+ validate();
+ return input.readByte();
+ }
+
+ public Instant getTimestamp() throws IOException {
+ validate();
+ Long watermarkMillis = input.readLong();
+ return new Instant(TimeUnit.MICROSECONDS.toMillis(watermarkMillis));
+ }
+
+ public <K> K deserializeKey(CoderTypeSerializer<K> keySerializer) throws IOException {
+ return deserializeObject(keySerializer);
+ }
+
+ public <T> T deserializeObject(CoderTypeSerializer<T> objectSerializer) throws IOException {
+ return objectSerializer.deserialize(input);
+ }
+
+ ///////// Helper Methods ///////
+
+ private byte[] readRawData() throws IOException {
+ validate();
+ int size = input.readInt();
+
+ byte[] serData = new byte[size];
+ int bytesRead = input.read(serData);
+ if (bytesRead != size) {
+ throw new RuntimeException("Error while deserializing checkpoint. Not enough bytes in the input stream.");
+ }
+ return serData;
+ }
+
+ private void validate() {
+ if (this.input == null) {
+ throw new RuntimeException("StateBackend not initialized yet.");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java
index cd85163..1741829 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java
@@ -34,120 +34,120 @@ import java.util.Set;
public class StateCheckpointUtils {
- public static <K> void encodeState(Map<K, FlinkStateInternals<K>> perKeyStateInternals,
- StateCheckpointWriter writer, Coder<K> keyCoder) throws IOException {
- CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
-
- int noOfKeys = perKeyStateInternals.size();
- writer.writeInt(noOfKeys);
- for (Map.Entry<K, FlinkStateInternals<K>> keyStatePair : perKeyStateInternals.entrySet()) {
- K key = keyStatePair.getKey();
- FlinkStateInternals<K> state = keyStatePair.getValue();
-
- // encode the key
- writer.serializeKey(key, keySerializer);
-
- // write the associated state
- state.persistState(writer);
- }
- }
-
- public static <K> Map<K, FlinkStateInternals<K>> decodeState(
- StateCheckpointReader reader,
- OutputTimeFn<? super BoundedWindow> outputTimeFn,
- Coder<K> keyCoder,
- Coder<? extends BoundedWindow> windowCoder,
- ClassLoader classLoader) throws IOException, ClassNotFoundException {
-
- int noOfKeys = reader.getInt();
- Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>(noOfKeys);
- perKeyStateInternals.clear();
-
- CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
- for (int i = 0; i < noOfKeys; i++) {
-
- // decode the key.
- K key = reader.deserializeKey(keySerializer);
-
- //decode the state associated to the key.
- FlinkStateInternals<K> stateForKey =
- new FlinkStateInternals<>(key, keyCoder, windowCoder, outputTimeFn);
- stateForKey.restoreState(reader, classLoader);
- perKeyStateInternals.put(key, stateForKey);
- }
- return perKeyStateInternals;
- }
-
- ////////////// Encoding/Decoding the Timers ////////////////
-
-
- public static <K> void encodeTimers(Map<K, Set<TimerInternals.TimerData>> allTimers,
- StateCheckpointWriter writer,
- Coder<K> keyCoder) throws IOException {
- CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
-
- int noOfKeys = allTimers.size();
- writer.writeInt(noOfKeys);
- for (Map.Entry<K, Set<TimerInternals.TimerData>> timersPerKey : allTimers.entrySet()) {
- K key = timersPerKey.getKey();
-
- // encode the key
- writer.serializeKey(key, keySerializer);
-
- // write the associated timers
- Set<TimerInternals.TimerData> timers = timersPerKey.getValue();
- encodeTimerDataForKey(writer, timers);
- }
- }
-
- public static <K> Map<K, Set<TimerInternals.TimerData>> decodeTimers(
- StateCheckpointReader reader,
- Coder<? extends BoundedWindow> windowCoder,
- Coder<K> keyCoder) throws IOException {
-
- int noOfKeys = reader.getInt();
- Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(noOfKeys);
- activeTimers.clear();
-
- CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
- for (int i = 0; i < noOfKeys; i++) {
-
- // decode the key.
- K key = reader.deserializeKey(keySerializer);
-
- // decode the associated timers.
- Set<TimerInternals.TimerData> timers = decodeTimerDataForKey(reader, windowCoder);
- activeTimers.put(key, timers);
- }
- return activeTimers;
- }
-
- private static void encodeTimerDataForKey(StateCheckpointWriter writer, Set<TimerInternals.TimerData> timers) throws IOException {
- // encode timers
- writer.writeInt(timers.size());
- for (TimerInternals.TimerData timer : timers) {
- String stringKey = timer.getNamespace().stringKey();
-
- writer.setTag(stringKey);
- writer.setTimestamp(timer.getTimestamp());
- writer.writeInt(timer.getDomain().ordinal());
- }
- }
-
- private static Set<TimerInternals.TimerData> decodeTimerDataForKey(
- StateCheckpointReader reader, Coder<? extends BoundedWindow> windowCoder) throws IOException {
-
- // decode the timers: first their number and then the content itself.
- int noOfTimers = reader.getInt();
- Set<TimerInternals.TimerData> timers = new HashSet<>(noOfTimers);
- for (int i = 0; i < noOfTimers; i++) {
- String stringKey = reader.getTagToString();
- Instant instant = reader.getTimestamp();
- TimeDomain domain = TimeDomain.values()[reader.getInt()];
-
- StateNamespace namespace = StateNamespaces.fromString(stringKey, windowCoder);
- timers.add(TimerInternals.TimerData.of(namespace, instant, domain));
- }
- return timers;
- }
+ public static <K> void encodeState(Map<K, FlinkStateInternals<K>> perKeyStateInternals,
+ StateCheckpointWriter writer, Coder<K> keyCoder) throws IOException {
+ CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
+
+ int noOfKeys = perKeyStateInternals.size();
+ writer.writeInt(noOfKeys);
+ for (Map.Entry<K, FlinkStateInternals<K>> keyStatePair : perKeyStateInternals.entrySet()) {
+ K key = keyStatePair.getKey();
+ FlinkStateInternals<K> state = keyStatePair.getValue();
+
+ // encode the key
+ writer.serializeKey(key, keySerializer);
+
+ // write the associated state
+ state.persistState(writer);
+ }
+ }
+
+ public static <K> Map<K, FlinkStateInternals<K>> decodeState(
+ StateCheckpointReader reader,
+ OutputTimeFn<? super BoundedWindow> outputTimeFn,
+ Coder<K> keyCoder,
+ Coder<? extends BoundedWindow> windowCoder,
+ ClassLoader classLoader) throws IOException, ClassNotFoundException {
+
+ int noOfKeys = reader.getInt();
+ Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>(noOfKeys);
+ perKeyStateInternals.clear();
+
+ CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
+ for (int i = 0; i < noOfKeys; i++) {
+
+ // decode the key.
+ K key = reader.deserializeKey(keySerializer);
+
+ //decode the state associated to the key.
+ FlinkStateInternals<K> stateForKey =
+ new FlinkStateInternals<>(key, keyCoder, windowCoder, outputTimeFn);
+ stateForKey.restoreState(reader, classLoader);
+ perKeyStateInternals.put(key, stateForKey);
+ }
+ return perKeyStateInternals;
+ }
+
+ ////////////// Encoding/Decoding the Timers ////////////////
+
+
+ public static <K> void encodeTimers(Map<K, Set<TimerInternals.TimerData>> allTimers,
+ StateCheckpointWriter writer,
+ Coder<K> keyCoder) throws IOException {
+ CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
+
+ int noOfKeys = allTimers.size();
+ writer.writeInt(noOfKeys);
+ for (Map.Entry<K, Set<TimerInternals.TimerData>> timersPerKey : allTimers.entrySet()) {
+ K key = timersPerKey.getKey();
+
+ // encode the key
+ writer.serializeKey(key, keySerializer);
+
+ // write the associated timers
+ Set<TimerInternals.TimerData> timers = timersPerKey.getValue();
+ encodeTimerDataForKey(writer, timers);
+ }
+ }
+
+ public static <K> Map<K, Set<TimerInternals.TimerData>> decodeTimers(
+ StateCheckpointReader reader,
+ Coder<? extends BoundedWindow> windowCoder,
+ Coder<K> keyCoder) throws IOException {
+
+ int noOfKeys = reader.getInt();
+ Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(noOfKeys);
+ activeTimers.clear();
+
+ CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
+ for (int i = 0; i < noOfKeys; i++) {
+
+ // decode the key.
+ K key = reader.deserializeKey(keySerializer);
+
+ // decode the associated timers.
+ Set<TimerInternals.TimerData> timers = decodeTimerDataForKey(reader, windowCoder);
+ activeTimers.put(key, timers);
+ }
+ return activeTimers;
+ }
+
+ private static void encodeTimerDataForKey(StateCheckpointWriter writer, Set<TimerInternals.TimerData> timers) throws IOException {
+ // encode timers
+ writer.writeInt(timers.size());
+ for (TimerInternals.TimerData timer : timers) {
+ String stringKey = timer.getNamespace().stringKey();
+
+ writer.setTag(stringKey);
+ writer.setTimestamp(timer.getTimestamp());
+ writer.writeInt(timer.getDomain().ordinal());
+ }
+ }
+
+ private static Set<TimerInternals.TimerData> decodeTimerDataForKey(
+ StateCheckpointReader reader, Coder<? extends BoundedWindow> windowCoder) throws IOException {
+
+ // decode the timers: first their number and then the content itself.
+ int noOfTimers = reader.getInt();
+ Set<TimerInternals.TimerData> timers = new HashSet<>(noOfTimers);
+ for (int i = 0; i < noOfTimers; i++) {
+ String stringKey = reader.getTagToString();
+ Instant instant = reader.getTimestamp();
+ TimeDomain domain = TimeDomain.values()[reader.getInt()];
+
+ StateNamespace namespace = StateNamespaces.fromString(stringKey, windowCoder);
+ timers.add(TimerInternals.TimerData.of(namespace, instant, domain));
+ }
+ return timers;
+ }
}