You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by gyfora <gi...@git.apache.org> on 2015/10/27 18:33:25 UTC

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

GitHub user gyfora opened a pull request:

    https://github.com/apache/flink/pull/1305

    Out-of-core state backend for JDBC databases

    Detailed description incoming...

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gyfora/flink master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1305.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1305
    
----
commit b793bca20b79c1fe38ed7a31deca485e7d109060
Author: Gyula Fora <gy...@apache.org>
Date:   2015-10-26T08:58:49Z

    [FLINK-2916] [streaming] Expose operator and task information to StateBackend

commit c35949f5e765f377799730a973b374eeea9c3921
Author: Gyula Fora <gy...@apache.org>
Date:   2015-10-27T17:31:04Z

    [FLINK-2924] [streaming] Out-of-core state backend for JDBC databases

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1305#discussion_r45473713
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
    @@ -64,33 +64,35 @@
     	/**
     	 * Closes the state backend, releasing all internal resources, but does not delete any persistent
     	 * checkpoint data.
    -	 * 
    +	 *
     	 * @throws Exception Exceptions can be forwarded and will be logged by the system
     	 */
     	public abstract void close() throws Exception;
    -	
    +
     	// ------------------------------------------------------------------------
     	//  key/value state
     	// ------------------------------------------------------------------------
     
     	/**
     	 * Creates a key/value state backed by this state backend.
    -	 * 
    +	 *
    +	 * @param operatorId Unique id for the operator creating the state
    +	 * @param stateName Name of the created state
     	 * @param keySerializer The serializer for the key.
     	 * @param valueSerializer The serializer for the value.
     	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
     	 * @param <K> The type of the key.
     	 * @param <V> The type of the value.
    -	 * 
    +	 *
     	 * @return A new key/value state backed by this backend.
    -	 * 
    +	 *
     	 * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
     	 */
    -	public abstract <K, V> KvState<K, V, Backend> createKvState(
    +	public abstract <K, V> KvState<K, V, Backend> createKvState(int operatorId, String stateName,
    --- End diff --
    
    You might be right that state name and operator id is too api specific, but we will need ways to globally identify states which is impossible without that I think currently.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-153351332
  
    @gyfora That's what I meant, basically the timestamps could subsume the role of the checkpointIds. I.e. The checkpointIds have the semantics of the timestamps and the timestamps would not be required. (Or the checkpointId would be removed and timestamps remain, depends on how you look at it... :smile: )



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1305#discussion_r45377370
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
    @@ -64,33 +64,35 @@
     	/**
     	 * Closes the state backend, releasing all internal resources, but does not delete any persistent
     	 * checkpoint data.
    -	 * 
    +	 *
     	 * @throws Exception Exceptions can be forwarded and will be logged by the system
     	 */
     	public abstract void close() throws Exception;
    -	
    +
     	// ------------------------------------------------------------------------
     	//  key/value state
     	// ------------------------------------------------------------------------
     
     	/**
     	 * Creates a key/value state backed by this state backend.
    -	 * 
    +	 *
    +	 * @param operatorId Unique id for the operator creating the state
    +	 * @param stateName Name of the created state
     	 * @param keySerializer The serializer for the key.
     	 * @param valueSerializer The serializer for the value.
     	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
     	 * @param <K> The type of the key.
     	 * @param <V> The type of the value.
    -	 * 
    +	 *
     	 * @return A new key/value state backed by this backend.
    -	 * 
    +	 *
     	 * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
     	 */
    -	public abstract <K, V> KvState<K, V, Backend> createKvState(
    +	public abstract <K, V> KvState<K, V, Backend> createKvState(int operatorId, String stateName,
    --- End diff --
    
    Let me first describe how sharding works than I will give a concrete example. 
    Key-Value pairs are sharded by key not by the subtask. This means that each parallel subtask maintains a connection to all the shards and partitions the states before writing them to the appropriate shards according to the user defined partitioner (in the backend config). This is much better than sharding by subtask because we can later change the parallelism of the job without affecting the state and also lets us defined a more elaborate sharding strategy through the partitioner.
    
    This means, when a kv state is created we create a table for that kvstate in each shard. If we would do it according to your suggestion we would need to create numShards number of tables for each parallel instance (total of p*ns) for each kvstate. Furthermore this makes the fancy sharding useless because we cannot change the job parallelism. So we need to make sure that parallel subtasks of a given operator write to the same state tables (so we only have ns number of tables regardless of the parallelism).
    
    In order to do this we need something that uniqely identifies a given state in the streaming program (and parallel instances should have the same id).
    
    The information required to create such unique state id is an identifier for the operator that has the state + the name of the state. (The information obtained from the environment is not enough because chained operators have the same environment, therefore if they have conflicting state names the id is not unique). The only thing that identifies an operator in the logical streaming program is the operator id assigned by the jobgraphbuilder (thats the whole point of having it). 
    
    An example job with p=2 and numshards = 3:
    
    chained map -> filter, both the mapper and filter has a state named "count", and let's assume that mapper has opid 1 and filter 2.
    
    In this case the mapper would create 3 db tables (1 on each shard) with the same name kvstate_count_1_jobId. The filter would also create 3 tables with names: kvstate_count_2_jobId
    
    All mapper instances would write to all three database shards, and the same goes for all the filters.
    
    I hope you get what I am trying to say. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-153337412
  
    You are right about the checkpointIDs being ignored. I don't see why we need to bother with changing the id semantics, the timestamps also serve as checkpoint Ids perfectly (in fact we could drop the checkpoint ID everywhere in Flink and just keep the timestamp).
    
    The good thing about timestamps is that they are not incremental, meaning that if the first checkpoint have ts = 100 and the second has ts = 5100 then we can write the intermediate updates with ts 101, 102 ... while maintaining uniqueness and monotonicity. This is something that we use here and that is why I am using timestamps instead of ids.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-156964684
  
    I'm looking at it again. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1305


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1305#discussion_r45371097
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
    @@ -64,33 +64,35 @@
     	/**
     	 * Closes the state backend, releasing all internal resources, but does not delete any persistent
     	 * checkpoint data.
    -	 * 
    +	 *
     	 * @throws Exception Exceptions can be forwarded and will be logged by the system
     	 */
     	public abstract void close() throws Exception;
    -	
    +
     	// ------------------------------------------------------------------------
     	//  key/value state
     	// ------------------------------------------------------------------------
     
     	/**
     	 * Creates a key/value state backed by this state backend.
    -	 * 
    +	 *
    +	 * @param operatorId Unique id for the operator creating the state
    +	 * @param stateName Name of the created state
     	 * @param keySerializer The serializer for the key.
     	 * @param valueSerializer The serializer for the value.
     	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
     	 * @param <K> The type of the key.
     	 * @param <V> The type of the value.
    -	 * 
    +	 *
     	 * @return A new key/value state backed by this backend.
    -	 * 
    +	 *
     	 * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
     	 */
    -	public abstract <K, V> KvState<K, V, Backend> createKvState(
    +	public abstract <K, V> KvState<K, V, Backend> createKvState(int operatorId, String stateName,
    --- End diff --
    
    I am not completely sure what you mean here.
    
    Multiple different states can have the same name in different tasks. As far as I know we dont assume unique state names. This gets worse if the chained tasks have states with the same name then they actually go to the same backend as well.
    
    I dont see how to go around this without an operator id. Could you please clarify your idea?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1305#discussion_r45372620
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
    @@ -64,33 +64,35 @@
     	/**
     	 * Closes the state backend, releasing all internal resources, but does not delete any persistent
     	 * checkpoint data.
    -	 * 
    +	 *
     	 * @throws Exception Exceptions can be forwarded and will be logged by the system
     	 */
     	public abstract void close() throws Exception;
    -	
    +
     	// ------------------------------------------------------------------------
     	//  key/value state
     	// ------------------------------------------------------------------------
     
     	/**
     	 * Creates a key/value state backed by this state backend.
    -	 * 
    +	 *
    +	 * @param operatorId Unique id for the operator creating the state
    +	 * @param stateName Name of the created state
     	 * @param keySerializer The serializer for the key.
     	 * @param valueSerializer The serializer for the value.
     	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
     	 * @param <K> The type of the key.
     	 * @param <V> The type of the value.
    -	 * 
    +	 *
     	 * @return A new key/value state backed by this backend.
    -	 * 
    +	 *
     	 * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
     	 */
    -	public abstract <K, V> KvState<K, V, Backend> createKvState(
    +	public abstract <K, V> KvState<K, V, Backend> createKvState(int operatorId, String stateName,
    --- End diff --
    
    What you mention depends on the parallel subtask ID (which is already given in the initialize() method). The operatorId and name are the same for all parallel instances anyways.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1305#discussion_r45479621
  
    --- Diff: flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java ---
    @@ -0,0 +1,88 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *	http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.concurrent.Callable;
    +
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.eclipse.jetty.util.log.Log;
    +
    +/**
    + * State handle implementation for storing checkpoints as byte arrays in
    + * databases using the {@link MySqlAdapter} defined in the {@link DbBackendConfig}.
    + * 
    + */
    +public class DbStateHandle<S> implements Serializable, StateHandle<S> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private final String jobId;
    +	private final DbBackendConfig dbConfig;
    +
    +	private final long checkpointId;
    +	private final long checkpointTs;
    +
    +	private final long handleId;
    +
    +	public DbStateHandle(String jobId, long checkpointId, long checkpointTs, long handleId, DbBackendConfig dbConfig) {
    +		this.checkpointId = checkpointId;
    +		this.handleId = handleId;
    +		this.jobId = jobId;
    +		this.dbConfig = dbConfig;
    +		this.checkpointTs = checkpointTs;
    +	}
    +
    +	protected byte[] getBytes() throws IOException {
    +		return retry(new Callable<byte[]>() {
    +			public byte[] call() throws Exception {
    +				try (ShardedConnection con = dbConfig.createShardedConnection()) {
    +					return dbConfig.getDbAdapter().getCheckpoint(jobId, con.getFirst(), checkpointId, checkpointTs, handleId);
    +				}
    +			}
    +		}, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries());
    +	}
    +
    +	@Override
    +	public void discardState() {
    +		try {
    +			retry(new Callable<Boolean>() {
    +				public Boolean call() throws Exception {
    +					try (ShardedConnection con = dbConfig.createShardedConnection()) {
    +						dbConfig.getDbAdapter().deleteCheckpoint(jobId, con.getFirst(), checkpointId, checkpointTs, handleId);
    +					}
    +					return true;
    +				}
    +			}, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries());
    +		} catch (IOException e) {
    +			// We don't want to fail the job here, but log the error.
    +			if (Log.isDebugEnabled()) {
    --- End diff --
    
    I think you accidentally used Jetty's logging here (see `import org.eclipse.jetty.util.log.Log`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1305#discussion_r45362408
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
    @@ -64,33 +64,35 @@
     	/**
     	 * Closes the state backend, releasing all internal resources, but does not delete any persistent
     	 * checkpoint data.
    -	 * 
    +	 *
     	 * @throws Exception Exceptions can be forwarded and will be logged by the system
     	 */
     	public abstract void close() throws Exception;
    -	
    +
     	// ------------------------------------------------------------------------
     	//  key/value state
     	// ------------------------------------------------------------------------
     
     	/**
     	 * Creates a key/value state backed by this state backend.
    -	 * 
    +	 *
    +	 * @param operatorId Unique id for the operator creating the state
    +	 * @param stateName Name of the created state
     	 * @param keySerializer The serializer for the key.
     	 * @param valueSerializer The serializer for the value.
     	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
     	 * @param <K> The type of the key.
     	 * @param <V> The type of the value.
    -	 * 
    +	 *
     	 * @return A new key/value state backed by this backend.
    -	 * 
    +	 *
     	 * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
     	 */
    -	public abstract <K, V> KvState<K, V, Backend> createKvState(
    +	public abstract <K, V> KvState<K, V, Backend> createKvState(int operatorId, String stateName,
    --- End diff --
    
    I would like to get rid of this change and simply let the state backend create a UID for the state name.
    
    This method is called one per proper creation of a state (so it should not need deterministic state naming). Recovery happens from the state handle, which can store all required info.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-152767697
  
    My last commit (that was meant to solve the problems with failed tasks writing to the db) introduced some issues with the exactly once guarantees. I will look into it tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-158799081
  
    @StephanEwen, @rmetzger:
    I addressed the comments regarding the logs and the state id.
    
    I also added a final improvement:
    
    -Now compaction is executed in a background thread using a SingleThreadedExecutor
    -At empty checkpoints a keepalive call is executed against the connections to avoid connection drops
    
    These changes are in the last 2 commits, so if you guys +1 these last modifications I will merge it. I guess the compaction part is the most interesting here.
    
    thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-159048039
  
    If no objections I would like to merge this :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1305#discussion_r46264229
  
    --- Diff: flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java ---
    @@ -0,0 +1,248 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *	http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.sql.Connection;
    +import java.sql.PreparedStatement;
    +import java.sql.SQLException;
    +import java.util.Random;
    +import java.util.concurrent.Callable;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.state.StateBackend;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
    +
    +/**
    + * {@link StateBackend} for storing checkpoints in JDBC supporting databases.
    + * Key-Value state is stored out-of-core and is lazily fetched using the
    + * {@link LazyDbKvState} implementation. A different backend can also be
    + * provided in the constructor to store the non-partitioned states. A common use
    + * case would be to store the key-value states in the database and store larger
    + * non-partitioned states on a distributed file system.
    + * <p>
    + * This backend implementation also allows the sharding of the checkpointed
    + * states among multiple database instances, which can be enabled by passing
    + * multiple database urls to the {@link DbBackendConfig} instance.
    + * <p>
    + * By default there are multiple tables created in the given databases: 1 table
    + * for non-partitioned checkpoints and 1 table for each key-value state in the
    + * streaming program.
    + * <p>
    + * To control table creation, insert/lookup operations and to provide
    + * compatibility for different SQL implementations, a custom
    + * {@link MySqlAdapter} can be supplied in the {@link DbBackendConfig}.
    + *
    + */
    +public class DbStateBackend extends StateBackend<DbStateBackend> {
    --- End diff --
    
    `StateBackend` implements the `Serializable` interface. Does this mean that `DbStateBackend` must also be `Serializable`? If this is the case, then this condition is violated because `env` and `insertStatement` are not serializable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1305#discussion_r46264128
  
    --- Diff: flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java ---
    @@ -0,0 +1,248 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *	http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.sql.Connection;
    +import java.sql.PreparedStatement;
    +import java.sql.SQLException;
    +import java.util.Random;
    +import java.util.concurrent.Callable;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.state.StateBackend;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
    +
    +/**
    + * {@link StateBackend} for storing checkpoints in JDBC supporting databases.
    + * Key-Value state is stored out-of-core and is lazily fetched using the
    + * {@link LazyDbKvState} implementation. A different backend can also be
    + * provided in the constructor to store the non-partitioned states. A common use
    + * case would be to store the key-value states in the database and store larger
    + * non-partitioned states on a distributed file system.
    + * <p>
    + * This backend implementation also allows the sharding of the checkpointed
    + * states among multiple database instances, which can be enabled by passing
    + * multiple database urls to the {@link DbBackendConfig} instance.
    + * <p>
    + * By default there are multiple tables created in the given databases: 1 table
    + * for non-partitioned checkpoints and 1 table for each key-value state in the
    + * streaming program.
    + * <p>
    + * To control table creation, insert/lookup operations and to provide
    + * compatibility for different SQL implementations, a custom
    + * {@link MySqlAdapter} can be supplied in the {@link DbBackendConfig}.
    + *
    + */
    +public class DbStateBackend extends StateBackend<DbStateBackend> {
    +
    +	private static final long serialVersionUID = 1L;
    +	private static final Logger LOG = LoggerFactory.getLogger(DbStateBackend.class);
    +
    +	private Random rnd;
    +
    +	// ------------------------------------------------------
    +
    +	private Environment env;
    +
    +	// ------------------------------------------------------
    +
    +	private final DbBackendConfig dbConfig;
    +	private final DbAdapter dbAdapter;
    +
    +	private ShardedConnection connections;
    +
    +	private final int numSqlRetries;
    +	private final int sqlRetrySleep;
    +
    +	private PreparedStatement insertStatement;
    --- End diff --
    
    `PreparedStatement` is not serializable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1305#discussion_r45544375
  
    --- Diff: flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java ---
    @@ -0,0 +1,88 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *	http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.concurrent.Callable;
    +
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.eclipse.jetty.util.log.Log;
    +
    +/**
    + * State handle implementation for storing checkpoints as byte arrays in
    + * databases using the {@link MySqlAdapter} defined in the {@link DbBackendConfig}.
    + * 
    + */
    +public class DbStateHandle<S> implements Serializable, StateHandle<S> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private final String jobId;
    +	private final DbBackendConfig dbConfig;
    +
    +	private final long checkpointId;
    +	private final long checkpointTs;
    +
    +	private final long handleId;
    +
    +	public DbStateHandle(String jobId, long checkpointId, long checkpointTs, long handleId, DbBackendConfig dbConfig) {
    +		this.checkpointId = checkpointId;
    +		this.handleId = handleId;
    +		this.jobId = jobId;
    +		this.dbConfig = dbConfig;
    +		this.checkpointTs = checkpointTs;
    +	}
    +
    +	protected byte[] getBytes() throws IOException {
    +		return retry(new Callable<byte[]>() {
    +			public byte[] call() throws Exception {
    +				try (ShardedConnection con = dbConfig.createShardedConnection()) {
    +					return dbConfig.getDbAdapter().getCheckpoint(jobId, con.getFirst(), checkpointId, checkpointTs, handleId);
    +				}
    +			}
    +		}, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries());
    +	}
    +
    +	@Override
    +	public void discardState() {
    +		try {
    +			retry(new Callable<Boolean>() {
    +				public Boolean call() throws Exception {
    +					try (ShardedConnection con = dbConfig.createShardedConnection()) {
    +						dbConfig.getDbAdapter().deleteCheckpoint(jobId, con.getFirst(), checkpointId, checkpointTs, handleId);
    +					}
    +					return true;
    +				}
    +			}, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries());
    +		} catch (IOException e) {
    +			// We don't want to fail the job here, but log the error.
    +			if (Log.isDebugEnabled()) {
    --- End diff --
    
    We could add a checkstyle rule for that, but I would like to solve the problem in a different way: I recently opened a JIRA for checking whether a Flink module is only using dependencies it has explicitly declared (forbidding to rely on transitive dependencies). WIth that check, we would also identify cases like this one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-153307495
  
    @aljoscha 
     1. I was initially using the MockEnvironments but I added the DummyEnvironment for several reasons: I wanted control over the JobId and the number of subtasks for which I would have changed the MockEnvironment. Also I wanted to avoid having to clean up the memorymanager and other resources as I really don't need them
     2. I don't really understand what you mean here, the recovery timestamp is only used for cleanup on restore
     3. Imagine a scenario where 2 task are restoring . 1 restores quickly and starts writing new timestamps. If we call cleanup on the other task it will remove the new states if we don't bound by recovery timestamp. This can happen easily.
    
    I don't know about the allOrNothingState :/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-153500173
  
    I am now writing a prototype for this version, the batch insert got pretty ugly...
    I will probably finish it tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-153289042
  
    Cool stuff, really! This is very much in line with what I had in mind for a SQL backend.
    
    Let me check if I understood everything correct (and see where my understanding is wrong), because I think we should be able to make an "exactly once" version of this based that mechanism. I am basically rephrasing what you describe in a different model.
    
    ### Basic Mode
    
    What this is effectively doing is a batched and asynchronous version of distributed 2-phase commit transactions. The phases look basically like this:
    
      - **Adding data**: Pipe all modifications into the database, but not commit the transaction. They are tagged with the timestamp of the upcoming checkpoint (or any coordinated increasing version counter). This can happen in the background thread, for as long as the in-operator cache holds all edits that are not in the database yet.
    
      - **Pre-commit**: This is when the checkpoint is triggered. All pending edits are written into the database and then the transaction is committed. The state handle only includes the timestamp used on the elements. In the classical 2-phase transactions, after a task acks the pre-commit, it has to be able to recover to that state, which is given here. The checkpoint is not immediately valid for recovery though, which means that recovery has to have either a filter, or issue a query that deletes all records with timestamps larger than the version given during recovery. After the pre-commit, the timestamp is locally incremented and work can continue.
    
      - **Full commit**: This happens implicitly when the checkpoint coordinator marks the checkpoint as complete.
    
      - **Recovery**: The timestamp (or version counter) of the last successful checkpoint is restored, the deletion of records that were committed (but where the checkpoint did not succeed as a whole) happens, then records are lazily fetched. 
    
    So far, this should give exactly once guarantees, or am I overlooking something?
    
    ### Compacting
    
    Whenever the "checkpoint complete" notification comes (or every so many changes) you trigger a clean-up query in the background. Given that the SQL database has a not completely terrible query planner, this SQL statement would be okay efficient (single semi join).
    ```
    DELETE FROM "table name" t1
    WHERE EXISTS 
      (SELECT *
         FROM "table name" t2
        WHERE t2.handle_id = t1.handle_id
          AND t2.timestamp > t1.timestamp    //-- a newer version exists for the same handle
          AND t2.timestamp <= GLOBAL_VERSION //-- and the newer version is globally committed
      )
    ```
    The good thing is that by virtue of having the incrementing global versions, we can set the isolation level for the query to "read uncommitted", which means that it will not lock anything and thus not compete with any other ongoing modifications.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-153695943
  
    I updated this PR with the reworked logic, it has several advantages over the previous timestamp based solution (including the elimination of transactions from the logic).
    
    The only problem I see is that the derby batch inserts happen row by row currently as it does not have the insert or update semantics.
    
    This rework will also make it easier to write connectors to non transactional stores like Cassandra


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-156038439
  
    I agree that this will be an important backend and good to have in. :) But do we need to push this right now? I think we should wait a little and make sure that it fits well into the other ongoing changes I think Stephan and Aljoscha are working on. At least let's wait for someone to review it again. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-153376152
  
    My last commit introduces automatic compaction with user specified frequency. It also allows the KvStates to implement the CheckpointNotifier interface in which case they will also get notified.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-158074782
  
    Looking though this again...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1305#discussion_r46264109
  
    --- Diff: flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java ---
    @@ -0,0 +1,248 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *	http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.sql.Connection;
    +import java.sql.PreparedStatement;
    +import java.sql.SQLException;
    +import java.util.Random;
    +import java.util.concurrent.Callable;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.state.StateBackend;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
    +
    +/**
    + * {@link StateBackend} for storing checkpoints in JDBC supporting databases.
    + * Key-Value state is stored out-of-core and is lazily fetched using the
    + * {@link LazyDbKvState} implementation. A different backend can also be
    + * provided in the constructor to store the non-partitioned states. A common use
    + * case would be to store the key-value states in the database and store larger
    + * non-partitioned states on a distributed file system.
    + * <p>
    + * This backend implementation also allows the sharding of the checkpointed
    + * states among multiple database instances, which can be enabled by passing
    + * multiple database urls to the {@link DbBackendConfig} instance.
    + * <p>
    + * By default there are multiple tables created in the given databases: 1 table
    + * for non-partitioned checkpoints and 1 table for each key-value state in the
    + * streaming program.
    + * <p>
    + * To control table creation, insert/lookup operations and to provide
    + * compatibility for different SQL implementations, a custom
    + * {@link MySqlAdapter} can be supplied in the {@link DbBackendConfig}.
    + *
    + */
    +public class DbStateBackend extends StateBackend<DbStateBackend> {
    +
    +	private static final long serialVersionUID = 1L;
    +	private static final Logger LOG = LoggerFactory.getLogger(DbStateBackend.class);
    +
    +	private Random rnd;
    +
    +	// ------------------------------------------------------
    +
    +	private Environment env;
    --- End diff --
    
    `env` is not serializable


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-153696208
  
    I also removed the sharding logic now as I think it was pretty weak and not very useful (it maintained 1 connection per subtask which would break if we change parallelism)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-158423218
  
    Had an offline chat with @gyfora with the following outcome:
      - A deterministic state identifier is needed
      - Small change to pass that identifier as a single ID String, initially internally constructed by state name + operator ID (as in this implementation)
      - That way, the streaming runtime can change handling of state names and operator IDs without breaking state backend implementations
    
    With these changes, looks good to merge.
    
    +1 from my side


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1305: Out-of-core state backend for JDBC databases

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/flink/pull/1305
  
    
    [![Coverage Status](https://coveralls.io/builds/13040362/badge)](https://coveralls.io/builds/13040362)
    
    Changes Unknown when pulling **db2a964a450c05cb2aad3843999d994e4b8e5ef5 on gyfora:master** into ** on apache:master**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1305#discussion_r45351896
  
    --- Diff: flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java ---
    @@ -0,0 +1,406 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *	http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import java.io.Serializable;
    +import java.sql.SQLException;
    +import java.util.List;
    +
    +import org.apache.flink.contrib.streaming.state.ShardedConnection.Partitioner;
    +
    +import com.google.common.collect.Lists;
    +
    +/**
    + * 
    + * Configuration object for {@link DbStateBackend}, containing information to
    + * shard and connect to the databases that will store the state checkpoints.
    + *
    + */
    +public class DbBackendConfig implements Serializable {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	// Database connection properties
    +	private final String userName;
    +	private final String userPassword;
    +	private final List<String> shardUrls;
    +
    +	// JDBC Driver + DbAdapter information
    +	private DbAdapter dbAdapter = new MySqlAdapter();
    +	private String JDBCDriver = null;
    +
    +	private int maxNumberOfSqlRetries = 5;
    +	private int sleepBetweenSqlRetries = 100;
    +
    +	// KvState properties
    +	private int kvStateCacheSize = 10000;
    +	private int maxKvInsertBatchSize = 1000;
    +	private float maxKvEvictFraction = 0.1f;
    +	private int kvStateCompactionFreq = -1;
    +
    +	private Partitioner shardPartitioner;
    +
    +	/**
    +	 * Creates a new sharded database state backend configuration with the given
    +	 * parameters and default {@link MySqlAdapter}.
    +	 * 
    +	 * @param dbUserName
    +	 *            The username used to connect to the database at the given url.
    +	 * @param dbUserPassword
    +	 *            The password used to connect to the database at the given url
    +	 *            and username.
    +	 * @param dbShardUrls
    +	 *            The list of JDBC urls of the databases that will be used as
    +	 *            shards for the state backend. Sharding of the state will
    +	 *            happen based on the subtask index of the given task.
    +	 */
    +	public DbBackendConfig(String dbUserName, String dbUserPassword, List<String> dbShardUrls) {
    +		this.userName = dbUserName;
    +		this.userPassword = dbUserPassword;
    +		this.shardUrls = dbShardUrls;
    +	}
    +
    +	/**
    +	 * Creates a new database state backend configuration with the given
    +	 * parameters and default {@link MySqlAdapter}.
    +	 * 
    +	 * @param dbUserName
    +	 *            The username used to connect to the database at the given url.
    +	 * @param dbUserPassword
    +	 *            The password used to connect to the database at the given url
    +	 *            and username.
    +	 * @param dbUrl
    +	 *            The JDBC url of the database for example
    +	 *            "jdbc:mysql://localhost:3306/flinkdb".
    +	 */
    +	public DbBackendConfig(String dbUserName, String dbUserPassword, String dbUrl) {
    +		this(dbUserName, dbUserPassword, Lists.newArrayList(dbUrl));
    +	}
    +
    +	/**
    +	 * The username used to connect to the database at the given urls.
    +	 */
    +	public String getUserName() {
    +		return userName;
    +	}
    +
    +	/**
    +	 * The password used to connect to the database at the given url and
    +	 * username.
    +	 */
    +	public String getUserPassword() {
    +		return userPassword;
    +	}
    +
    +	/**
    +	 * Number of database shards defined.
    +	 */
    +	public int getNumberOfShards() {
    +		return shardUrls.size();
    +	}
    +
    +	/**
    +	 * Database shard urls as provided in the constructor.
    +	 * 
    +	 */
    +	public List<String> getShardUrls() {
    +		return shardUrls;
    +	}
    +
    +	/**
    +	 * The url of the first shard.
    +	 * 
    +	 */
    +	public String getUrl() {
    +		return getShardUrl(0);
    +	}
    +
    +	/**
    +	 * The url of a specific shard.
    +	 * 
    +	 */
    +	public String getShardUrl(int shardIndex) {
    +		validateShardIndex(shardIndex);
    +		return shardUrls.get(shardIndex);
    +	}
    +
    +	private void validateShardIndex(int i) {
    +		if (i < 0) {
    +			throw new IllegalArgumentException("Index must be positive.");
    +		} else if (getNumberOfShards() <= i) {
    +			throw new IllegalArgumentException("Index must be less then the total number of shards.");
    +		}
    +	}
    +
    +	/**
    +	 * Get the {@link DbAdapter} that will be used to operate on the database
    +	 * during checkpointing.
    +	 * 
    +	 */
    +	public DbAdapter getDbAdapter() {
    +		return dbAdapter;
    +	}
    +
    +	/**
    +	 * Set the {@link DbAdapter} that will be used to operate on the database
    +	 * during checkpointing.
    +	 * 
    +	 */
    +	public void setDbAdapter(DbAdapter adapter) {
    +		this.dbAdapter = adapter;
    +	}
    +
    +	/**
    +	 * The class name that should be used to load the JDBC driver using
    +	 * Class.forName(JDBCDriverClass).
    +	 */
    +	public String getJDBCDriver() {
    +		return JDBCDriver;
    +	}
    +
    +	/**
    +	 * Set the class name that should be used to load the JDBC driver using
    +	 * Class.forName(JDBCDriverClass).
    +	 */
    +	public void setJDBCDriver(String jDBCDriverClassName) {
    +		JDBCDriver = jDBCDriverClassName;
    +	}
    +
    +	/**
    +	 * The maximum number of key-value pairs stored in one task instance's cache
    +	 * before evicting to the underlying database.
    +	 *
    +	 */
    +	public int getKvCacheSize() {
    +		return kvStateCacheSize;
    +	}
    +
    +	/**
    +	 * Set the maximum number of key-value pairs stored in one task instance's
    +	 * cache before evicting to the underlying database. When the cache is full
    +	 * the N least recently used keys will be evicted to the database, where N =
    +	 * maxKvEvictFraction*KvCacheSize.
    +	 *
    +	 */
    +	public void setKvCacheSize(int size) {
    +		kvStateCacheSize = size;
    +	}
    +
    +	/**
    +	 * The maximum number of key-value pairs inserted in the database as one
    +	 * batch operation.
    +	 */
    +	public int getMaxKvInsertBatchSize() {
    +		return maxKvInsertBatchSize;
    +	}
    +
    +	/**
    +	 * Set the maximum number of key-value pairs inserted in the database as one
    +	 * batch operation.
    +	 */
    +	public void setMaxKvInsertBatchSize(int size) {
    +		maxKvInsertBatchSize = size;
    +	}
    +
    +	/**
    +	 * Sets the maximum fraction of key-value states evicted from the cache if
    +	 * the cache is full.
    +	 */
    +	public void setMaxKvCacheEvictFraction(float fraction) {
    +		if (fraction > 1 || fraction <= 0) {
    +			throw new RuntimeException("Must be a number between 0 and 1");
    +		} else {
    +			maxKvEvictFraction = fraction;
    +		}
    +	}
    +
    +	/**
    +	 * The maximum fraction of key-value states evicted from the cache if the
    +	 * cache is full.
    +	 */
    +	public float getMaxKvCacheEvictFraction() {
    +		return maxKvEvictFraction;
    +	}
    +
    +	/**
    +	 * The number of elements that will be evicted when the cache is full.
    +	 * 
    +	 */
    +	public int getNumElementsToEvict() {
    +		return (int) Math.ceil(getKvCacheSize() * getMaxKvCacheEvictFraction());
    +	}
    +
    +	/**
    +	 * Sets how often will automatic compaction be performed on the database to
    +	 * remove old overwritten state changes. The frequency is set in terms of
    +	 * number of successful checkpoints between two compactions and should take
    +	 * the state size and checkpoint frequency into account.
    +	 * <p>
    +	 * By default automatic compaction is turned off.
    +	 */
    +	public void setKvStateCompactionFrequency(int compactEvery) {
    +		this.kvStateCompactionFreq = compactEvery;
    +	}
    +
    +	/**
    +	 * Sets how often will automatic compaction be performed on the database to
    +	 * remove old overwritten state changes. The frequency is set in terms of
    +	 * number of successful checkpoints between two compactions and should take
    +	 * the state size and checkpoint frequency into account.
    +	 * <p>
    +	 * By default automatic compaction is turned off.
    +	 */
    +	public int getKvStateCompactionFrequency() {
    +		return kvStateCompactionFreq;
    +	}
    +
    +	/**
    +	 * The number of times each SQL command will be retried on failure.
    +	 */
    +	public int getMaxNumberOfSqlRetries() {
    +		return maxNumberOfSqlRetries;
    +	}
    +
    +	/**
    +	 * Sets the number of times each SQL command will be retried on failure.
    +	 */
    +	public void setMaxNumberOfSqlRetries(int maxNumberOfSqlRetries) {
    +		this.maxNumberOfSqlRetries = maxNumberOfSqlRetries;
    +	}
    +
    +	/**
    +	 * The number of milliseconds slept between two SQL retries. The actual
    +	 * sleep time will be chosen randomly between 1 and the given time.
    +	 * 
    +	 */
    +	public int getSleepBetweenSqlRetries() {
    +		return sleepBetweenSqlRetries;
    +	}
    +
    +	/**
    +	 * Sets the number of milliseconds slept between two SQL retries. The actual
    +	 * sleep time will be chosen randomly between 1 and the given time.
    +	 * 
    +	 */
    +	public void setSleepBetweenSqlRetries(int sleepBetweenSqlRetries) {
    +		this.sleepBetweenSqlRetries = sleepBetweenSqlRetries;
    +	}
    +
    +	/**
    +	 * Sets the partitioner used to assign keys to different database shards
    +	 */
    +	public void setPartitioner(Partitioner partitioner) {
    +		this.shardPartitioner = partitioner;
    +	}
    +
    +	/**
    +	 * Creates a new {@link ShardedConnection} using the set parameters.
    +	 * 
    +	 * @throws SQLException
    +	 */
    +	public ShardedConnection createShardedConnection() throws SQLException {
    +		if (JDBCDriver != null) {
    +			try {
    +				Class.forName(JDBCDriver);
    +			} catch (ClassNotFoundException e) {
    +				throw new RuntimeException("Could not load JDBC driver class", e);
    +			}
    +		}
    +		if (shardPartitioner == null) {
    +			return new ShardedConnection(shardUrls, userName, userPassword);
    +		} else {
    +			return new ShardedConnection(shardUrls, userName, userPassword, shardPartitioner);
    +		}
    +	}
    +
    +	@Override
    +	public boolean equals(Object obj) {
    --- End diff --
    
    That must have been a mistake I generated it with eclipse :/ I only used it for a test but it doesnt make too much sense so I can remive it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-156034877
  
    I would like to push this soon if no objections


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-153872782
  
    I updated the sharding logic to do mod hashing by default on the keys for the number of shards, and the user can also add a custom Partitioner to implement custom sharding strategy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1305#discussion_r45372569
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
    @@ -64,33 +64,35 @@
     	/**
     	 * Closes the state backend, releasing all internal resources, but does not delete any persistent
     	 * checkpoint data.
    -	 * 
    +	 *
     	 * @throws Exception Exceptions can be forwarded and will be logged by the system
     	 */
     	public abstract void close() throws Exception;
    -	
    +
     	// ------------------------------------------------------------------------
     	//  key/value state
     	// ------------------------------------------------------------------------
     
     	/**
     	 * Creates a key/value state backed by this state backend.
    -	 * 
    +	 *
    +	 * @param operatorId Unique id for the operator creating the state
    +	 * @param stateName Name of the created state
     	 * @param keySerializer The serializer for the key.
     	 * @param valueSerializer The serializer for the value.
     	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
     	 * @param <K> The type of the key.
     	 * @param <V> The type of the value.
    -	 * 
    +	 *
     	 * @return A new key/value state backed by this backend.
    -	 * 
    +	 *
     	 * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
     	 */
    -	public abstract <K, V> KvState<K, V, Backend> createKvState(
    +	public abstract <K, V> KvState<K, V, Backend> createKvState(int operatorId, String stateName,
    --- End diff --
    
    Otherwise you will have to create p*numShards tables and you wont even know what state is in it from looking at the table names


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-155765000
  
    Should we do a final iteration over this and merge this to contrib?
    
    The description got slightly out of date when I changed this back so that it stores the state by timestamp (but its basically ctr+f replace id with timestamp)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-158107515
  
    I have a final comment inline. Otherwise, I think this is good to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1305#discussion_r45373479
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
    @@ -64,33 +64,35 @@
     	/**
     	 * Closes the state backend, releasing all internal resources, but does not delete any persistent
     	 * checkpoint data.
    -	 * 
    +	 *
     	 * @throws Exception Exceptions can be forwarded and will be logged by the system
     	 */
     	public abstract void close() throws Exception;
    -	
    +
     	// ------------------------------------------------------------------------
     	//  key/value state
     	// ------------------------------------------------------------------------
     
     	/**
     	 * Creates a key/value state backed by this state backend.
    -	 * 
    +	 *
    +	 * @param operatorId Unique id for the operator creating the state
    +	 * @param stateName Name of the created state
     	 * @param keySerializer The serializer for the key.
     	 * @param valueSerializer The serializer for the value.
     	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
     	 * @param <K> The type of the key.
     	 * @param <V> The type of the value.
    -	 * 
    +	 *
     	 * @return A new key/value state backed by this backend.
    -	 * 
    +	 *
     	 * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
     	 */
    -	public abstract <K, V> KvState<K, V, Backend> createKvState(
    +	public abstract <K, V> KvState<K, V, Backend> createKvState(int operatorId, String stateName,
    --- End diff --
    
    The "name" (as a string) of the state is a very API specific thing that no other part of the runtime is concerned with. The operator ID is something specific to the StreamGraphBuilder and not to the streaming tasks at all. I think we are tying things together here that should not be tied together.
    
    I still do not understand how this affects sharding. Does the shard assignment depend on the state name (rather than the parallel subtask / JobVertexId) ?
    
    I only see that the table names will have the task name instead of the name of the state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1305#discussion_r45372303
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
    @@ -64,33 +64,35 @@
     	/**
     	 * Closes the state backend, releasing all internal resources, but does not delete any persistent
     	 * checkpoint data.
    -	 * 
    +	 *
     	 * @throws Exception Exceptions can be forwarded and will be logged by the system
     	 */
     	public abstract void close() throws Exception;
    -	
    +
     	// ------------------------------------------------------------------------
     	//  key/value state
     	// ------------------------------------------------------------------------
     
     	/**
     	 * Creates a key/value state backed by this state backend.
    -	 * 
    +	 *
    +	 * @param operatorId Unique id for the operator creating the state
    +	 * @param stateName Name of the created state
     	 * @param keySerializer The serializer for the key.
     	 * @param valueSerializer The serializer for the value.
     	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
     	 * @param <K> The type of the key.
     	 * @param <V> The type of the value.
    -	 * 
    +	 *
     	 * @return A new key/value state backed by this backend.
    -	 * 
    +	 *
     	 * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
     	 */
    -	public abstract <K, V> KvState<K, V, Backend> createKvState(
    +	public abstract <K, V> KvState<K, V, Backend> createKvState(int operatorId, String stateName,
    --- End diff --
    
    The point is that all parallel instances write to the same set of tables. This will way sharding is transparently handled and the job parallelism can actually change without affecting the state. (No need to repartition it)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-152944197
  
    Good stuff! Will need a day more to look through this, but this is a cool way of doing stateful stream computation :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-153333538
  
    2. Ah, I meant the lookupTimestamp. In an earlier version you used both the checkpointId and lookupTimestamp to perform key lookups.
    3. I see, in this implementation of state the timestamp has basically assumed the role of the checkpointId and the checkpointId is (I think) completely ignored. Correct? Couldn't we then change the semantics of the checkpointId to work like the timestamps (they are somewhat logical, not physical timestamps anyways)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-153305943
  
    Just some remarks:
     - `DummyEnvironment` seems unnecessary, we already have `StreamMockEnvironment`. I think it could be reused.
    
    - In the first version you had both the timestamp and checkpoint and recovery/key lookup took both into account. The recent version uses just the timestamp for lookup. Both introduce the new restore timestamp in the restore methods.
    
    - The cleanup of failed checkpoints took into account the checkpoint and the recovery timestamp, but I think the recovery timestamp was always redundant since the condition in the SQL statement would always hold.
    
    => I think the timestamp is not needed. Can't everything be implemented by just using the (monotonically rising) checkpoint IDs?
    
    Also, this is unrelated but maybe @StephanEwen or @gyfora know: Why to we have the `allOrNothingState` in `CheckpointCoordinator.restoreLatestCheckpointedState`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1305#discussion_r45474858
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
    @@ -64,33 +64,35 @@
     	/**
     	 * Closes the state backend, releasing all internal resources, but does not delete any persistent
     	 * checkpoint data.
    -	 * 
    +	 *
     	 * @throws Exception Exceptions can be forwarded and will be logged by the system
     	 */
     	public abstract void close() throws Exception;
    -	
    +
     	// ------------------------------------------------------------------------
     	//  key/value state
     	// ------------------------------------------------------------------------
     
     	/**
     	 * Creates a key/value state backed by this state backend.
    -	 * 
    +	 *
    +	 * @param operatorId Unique id for the operator creating the state
    +	 * @param stateName Name of the created state
     	 * @param keySerializer The serializer for the key.
     	 * @param valueSerializer The serializer for the value.
     	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
     	 * @param <K> The type of the key.
     	 * @param <V> The type of the value.
    -	 * 
    +	 *
     	 * @return A new key/value state backed by this backend.
    -	 * 
    +	 *
     	 * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
     	 */
    -	public abstract <K, V> KvState<K, V, Backend> createKvState(
    +	public abstract <K, V> KvState<K, V, Backend> createKvState(int operatorId, String stateName,
    --- End diff --
    
    Thanks for the description of the sharding. The issue is that you need a deterministic table name that each KeyValueState can create independently.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-156064645
  
    I totally understand your point, but I think it's OK that changes of this scope take longer to review and get in (my HA PR took over a month or so to get in). At the end of the day, it matters more that we get this right (because it covers a very important use case) than getting it in a few days earlier.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-153470424
  
    After some initial discussion we @StephanEwen we came to the following conclusions:
    
    The current timestamp based approach has some limitations in terms of the assumptions it makes about the clocks on the checkpoint coordinator which can cause some issues in case of master failure. It is currently assumed that the recovery timestamp assigned by the checkpoint coordinator is larger than the last checkpoint timestamp assigned before failure. If the clock difference is large enough this might be a problem, although fairly unlikely.
    
    The alternative approach would be to store the id, key, value triplets in the database (drop the timestamps). Snapshots written between two snapshots would be written with an incremented checkpoint id (and multiple spills to the same key would update the value not insert a new one every time). Cleanup could then be performed by getting the next checkpoint id from the coordinator (which would get it from zookeper). The drawback in this case would be an increased complexity for the batch inserts as we need to be able to handle primary key conflicts efficiently. While this is easy on some databases  (in MySql) in other cases (Derby) can be problematic.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-156809400
  
    I have updated the description, and ran some more cluster tests without any issues.
    
    It would be good if you all could do a second round of reviews please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1305#discussion_r45351298
  
    --- Diff: flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java ---
    @@ -0,0 +1,406 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *	http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import java.io.Serializable;
    +import java.sql.SQLException;
    +import java.util.List;
    +
    +import org.apache.flink.contrib.streaming.state.ShardedConnection.Partitioner;
    +
    +import com.google.common.collect.Lists;
    +
    +/**
    + * 
    + * Configuration object for {@link DbStateBackend}, containing information to
    + * shard and connect to the databases that will store the state checkpoints.
    + *
    + */
    +public class DbBackendConfig implements Serializable {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	// Database connection properties
    +	private final String userName;
    +	private final String userPassword;
    +	private final List<String> shardUrls;
    +
    +	// JDBC Driver + DbAdapter information
    +	private DbAdapter dbAdapter = new MySqlAdapter();
    +	private String JDBCDriver = null;
    +
    +	private int maxNumberOfSqlRetries = 5;
    +	private int sleepBetweenSqlRetries = 100;
    +
    +	// KvState properties
    +	private int kvStateCacheSize = 10000;
    +	private int maxKvInsertBatchSize = 1000;
    +	private float maxKvEvictFraction = 0.1f;
    +	private int kvStateCompactionFreq = -1;
    +
    +	private Partitioner shardPartitioner;
    +
    +	/**
    +	 * Creates a new sharded database state backend configuration with the given
    +	 * parameters and default {@link MySqlAdapter}.
    +	 * 
    +	 * @param dbUserName
    +	 *            The username used to connect to the database at the given url.
    +	 * @param dbUserPassword
    +	 *            The password used to connect to the database at the given url
    +	 *            and username.
    +	 * @param dbShardUrls
    +	 *            The list of JDBC urls of the databases that will be used as
    +	 *            shards for the state backend. Sharding of the state will
    +	 *            happen based on the subtask index of the given task.
    +	 */
    +	public DbBackendConfig(String dbUserName, String dbUserPassword, List<String> dbShardUrls) {
    +		this.userName = dbUserName;
    +		this.userPassword = dbUserPassword;
    +		this.shardUrls = dbShardUrls;
    +	}
    +
    +	/**
    +	 * Creates a new database state backend configuration with the given
    +	 * parameters and default {@link MySqlAdapter}.
    +	 * 
    +	 * @param dbUserName
    +	 *            The username used to connect to the database at the given url.
    +	 * @param dbUserPassword
    +	 *            The password used to connect to the database at the given url
    +	 *            and username.
    +	 * @param dbUrl
    +	 *            The JDBC url of the database for example
    +	 *            "jdbc:mysql://localhost:3306/flinkdb".
    +	 */
    +	public DbBackendConfig(String dbUserName, String dbUserPassword, String dbUrl) {
    +		this(dbUserName, dbUserPassword, Lists.newArrayList(dbUrl));
    +	}
    +
    +	/**
    +	 * The username used to connect to the database at the given urls.
    +	 */
    +	public String getUserName() {
    +		return userName;
    +	}
    +
    +	/**
    +	 * The password used to connect to the database at the given url and
    +	 * username.
    +	 */
    +	public String getUserPassword() {
    +		return userPassword;
    +	}
    +
    +	/**
    +	 * Number of database shards defined.
    +	 */
    +	public int getNumberOfShards() {
    +		return shardUrls.size();
    +	}
    +
    +	/**
    +	 * Database shard urls as provided in the constructor.
    +	 * 
    +	 */
    +	public List<String> getShardUrls() {
    +		return shardUrls;
    +	}
    +
    +	/**
    +	 * The url of the first shard.
    +	 * 
    +	 */
    +	public String getUrl() {
    +		return getShardUrl(0);
    +	}
    +
    +	/**
    +	 * The url of a specific shard.
    +	 * 
    +	 */
    +	public String getShardUrl(int shardIndex) {
    +		validateShardIndex(shardIndex);
    +		return shardUrls.get(shardIndex);
    +	}
    +
    +	private void validateShardIndex(int i) {
    +		if (i < 0) {
    +			throw new IllegalArgumentException("Index must be positive.");
    +		} else if (getNumberOfShards() <= i) {
    +			throw new IllegalArgumentException("Index must be less then the total number of shards.");
    +		}
    +	}
    +
    +	/**
    +	 * Get the {@link DbAdapter} that will be used to operate on the database
    +	 * during checkpointing.
    +	 * 
    +	 */
    +	public DbAdapter getDbAdapter() {
    +		return dbAdapter;
    +	}
    +
    +	/**
    +	 * Set the {@link DbAdapter} that will be used to operate on the database
    +	 * during checkpointing.
    +	 * 
    +	 */
    +	public void setDbAdapter(DbAdapter adapter) {
    +		this.dbAdapter = adapter;
    +	}
    +
    +	/**
    +	 * The class name that should be used to load the JDBC driver using
    +	 * Class.forName(JDBCDriverClass).
    +	 */
    +	public String getJDBCDriver() {
    +		return JDBCDriver;
    +	}
    +
    +	/**
    +	 * Set the class name that should be used to load the JDBC driver using
    +	 * Class.forName(JDBCDriverClass).
    +	 */
    +	public void setJDBCDriver(String jDBCDriverClassName) {
    +		JDBCDriver = jDBCDriverClassName;
    +	}
    +
    +	/**
    +	 * The maximum number of key-value pairs stored in one task instance's cache
    +	 * before evicting to the underlying database.
    +	 *
    +	 */
    +	public int getKvCacheSize() {
    +		return kvStateCacheSize;
    +	}
    +
    +	/**
    +	 * Set the maximum number of key-value pairs stored in one task instance's
    +	 * cache before evicting to the underlying database. When the cache is full
    +	 * the N least recently used keys will be evicted to the database, where N =
    +	 * maxKvEvictFraction*KvCacheSize.
    +	 *
    +	 */
    +	public void setKvCacheSize(int size) {
    +		kvStateCacheSize = size;
    +	}
    +
    +	/**
    +	 * The maximum number of key-value pairs inserted in the database as one
    +	 * batch operation.
    +	 */
    +	public int getMaxKvInsertBatchSize() {
    +		return maxKvInsertBatchSize;
    +	}
    +
    +	/**
    +	 * Set the maximum number of key-value pairs inserted in the database as one
    +	 * batch operation.
    +	 */
    +	public void setMaxKvInsertBatchSize(int size) {
    +		maxKvInsertBatchSize = size;
    +	}
    +
    +	/**
    +	 * Sets the maximum fraction of key-value states evicted from the cache if
    +	 * the cache is full.
    +	 */
    +	public void setMaxKvCacheEvictFraction(float fraction) {
    +		if (fraction > 1 || fraction <= 0) {
    +			throw new RuntimeException("Must be a number between 0 and 1");
    +		} else {
    +			maxKvEvictFraction = fraction;
    +		}
    +	}
    +
    +	/**
    +	 * The maximum fraction of key-value states evicted from the cache if the
    +	 * cache is full.
    +	 */
    +	public float getMaxKvCacheEvictFraction() {
    +		return maxKvEvictFraction;
    +	}
    +
    +	/**
    +	 * The number of elements that will be evicted when the cache is full.
    +	 * 
    +	 */
    +	public int getNumElementsToEvict() {
    +		return (int) Math.ceil(getKvCacheSize() * getMaxKvCacheEvictFraction());
    +	}
    +
    +	/**
    +	 * Sets how often will automatic compaction be performed on the database to
    +	 * remove old overwritten state changes. The frequency is set in terms of
    +	 * number of successful checkpoints between two compactions and should take
    +	 * the state size and checkpoint frequency into account.
    +	 * <p>
    +	 * By default automatic compaction is turned off.
    +	 */
    +	public void setKvStateCompactionFrequency(int compactEvery) {
    +		this.kvStateCompactionFreq = compactEvery;
    +	}
    +
    +	/**
    +	 * Sets how often will automatic compaction be performed on the database to
    +	 * remove old overwritten state changes. The frequency is set in terms of
    +	 * number of successful checkpoints between two compactions and should take
    +	 * the state size and checkpoint frequency into account.
    +	 * <p>
    +	 * By default automatic compaction is turned off.
    +	 */
    +	public int getKvStateCompactionFrequency() {
    +		return kvStateCompactionFreq;
    +	}
    +
    +	/**
    +	 * The number of times each SQL command will be retried on failure.
    +	 */
    +	public int getMaxNumberOfSqlRetries() {
    +		return maxNumberOfSqlRetries;
    +	}
    +
    +	/**
    +	 * Sets the number of times each SQL command will be retried on failure.
    +	 */
    +	public void setMaxNumberOfSqlRetries(int maxNumberOfSqlRetries) {
    +		this.maxNumberOfSqlRetries = maxNumberOfSqlRetries;
    +	}
    +
    +	/**
    +	 * The number of milliseconds slept between two SQL retries. The actual
    +	 * sleep time will be chosen randomly between 1 and the given time.
    +	 * 
    +	 */
    +	public int getSleepBetweenSqlRetries() {
    +		return sleepBetweenSqlRetries;
    +	}
    +
    +	/**
    +	 * Sets the number of milliseconds slept between two SQL retries. The actual
    +	 * sleep time will be chosen randomly between 1 and the given time.
    +	 * 
    +	 */
    +	public void setSleepBetweenSqlRetries(int sleepBetweenSqlRetries) {
    +		this.sleepBetweenSqlRetries = sleepBetweenSqlRetries;
    +	}
    +
    +	/**
    +	 * Sets the partitioner used to assign keys to different database shards
    +	 */
    +	public void setPartitioner(Partitioner partitioner) {
    +		this.shardPartitioner = partitioner;
    +	}
    +
    +	/**
    +	 * Creates a new {@link ShardedConnection} using the set parameters.
    +	 * 
    +	 * @throws SQLException
    +	 */
    +	public ShardedConnection createShardedConnection() throws SQLException {
    +		if (JDBCDriver != null) {
    +			try {
    +				Class.forName(JDBCDriver);
    +			} catch (ClassNotFoundException e) {
    +				throw new RuntimeException("Could not load JDBC driver class", e);
    +			}
    +		}
    +		if (shardPartitioner == null) {
    +			return new ShardedConnection(shardUrls, userName, userPassword);
    +		} else {
    +			return new ShardedConnection(shardUrls, userName, userPassword, shardPartitioner);
    +		}
    +	}
    +
    +	@Override
    +	public boolean equals(Object obj) {
    --- End diff --
    
    Overriding equals() but not hashCode(). Not sure what the usage of this object is, but dropping the equals method may make sense (do you ever compare configs fro equality?).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-156039726
  
    Well, I don't know what they are working on... It would be easier not having to rebase state backend api changes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1305#discussion_r45479951
  
    --- Diff: flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java ---
    @@ -0,0 +1,88 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *	http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.concurrent.Callable;
    +
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.eclipse.jetty.util.log.Log;
    +
    +/**
    + * State handle implementation for storing checkpoints as byte arrays in
    + * databases using the {@link MySqlAdapter} defined in the {@link DbBackendConfig}.
    + * 
    + */
    +public class DbStateHandle<S> implements Serializable, StateHandle<S> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private final String jobId;
    +	private final DbBackendConfig dbConfig;
    +
    +	private final long checkpointId;
    +	private final long checkpointTs;
    +
    +	private final long handleId;
    +
    +	public DbStateHandle(String jobId, long checkpointId, long checkpointTs, long handleId, DbBackendConfig dbConfig) {
    +		this.checkpointId = checkpointId;
    +		this.handleId = handleId;
    +		this.jobId = jobId;
    +		this.dbConfig = dbConfig;
    +		this.checkpointTs = checkpointTs;
    +	}
    +
    +	protected byte[] getBytes() throws IOException {
    +		return retry(new Callable<byte[]>() {
    +			public byte[] call() throws Exception {
    +				try (ShardedConnection con = dbConfig.createShardedConnection()) {
    +					return dbConfig.getDbAdapter().getCheckpoint(jobId, con.getFirst(), checkpointId, checkpointTs, handleId);
    +				}
    +			}
    +		}, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries());
    +	}
    +
    +	@Override
    +	public void discardState() {
    +		try {
    +			retry(new Callable<Boolean>() {
    +				public Boolean call() throws Exception {
    +					try (ShardedConnection con = dbConfig.createShardedConnection()) {
    +						dbConfig.getDbAdapter().deleteCheckpoint(jobId, con.getFirst(), checkpointId, checkpointTs, handleId);
    +					}
    +					return true;
    +				}
    +			}, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries());
    +		} catch (IOException e) {
    +			// We don't want to fail the job here, but log the error.
    +			if (Log.isDebugEnabled()) {
    --- End diff --
    
    Good catch, thanks Robert :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-159205217
  
    I think you can go ahead. It's in contrib and you guys are battle-testing it anyways... :wink: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1305#discussion_r45371919
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
    @@ -64,33 +64,35 @@
     	/**
     	 * Closes the state backend, releasing all internal resources, but does not delete any persistent
     	 * checkpoint data.
    -	 * 
    +	 *
     	 * @throws Exception Exceptions can be forwarded and will be logged by the system
     	 */
     	public abstract void close() throws Exception;
    -	
    +
     	// ------------------------------------------------------------------------
     	//  key/value state
     	// ------------------------------------------------------------------------
     
     	/**
     	 * Creates a key/value state backed by this state backend.
    -	 * 
    +	 *
    +	 * @param operatorId Unique id for the operator creating the state
    +	 * @param stateName Name of the created state
     	 * @param keySerializer The serializer for the key.
     	 * @param valueSerializer The serializer for the value.
     	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
     	 * @param <K> The type of the key.
     	 * @param <V> The type of the value.
    -	 * 
    +	 *
     	 * @return A new key/value state backed by this backend.
    -	 * 
    +	 *
     	 * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
     	 */
    -	public abstract <K, V> KvState<K, V, Backend> createKvState(
    +	public abstract <K, V> KvState<K, V, Backend> createKvState(int operatorId, String stateName,
    --- End diff --
    
    I suggest to not let the operator supply an ID and name, but simply leave the naming of the state to the state backend. The SqlStateBackend could just use `UUID.randomUUID().toString()` instead of `operatorId+stateName`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-153339094
  
    @StephanEwen 
    Thanks for the comments. You are right the main idea is exactly as you described. 
    The reason why exactly-once is violated in some corner cases because it can happen that the pre-commit phase of the previous checkpoint is still failing during recovery. 
    
    If we assume that the previous job is completely killed of, no writing to the database whatsoever after that happens, then we can properly clean up during recovery. 
    
    This unfortunately does not seem to hold if you set the retry wait time to very low (like 0 ms in the snapshot). What this means is that the failed job is still writing the failed snapshot to the database after you recovered and cleaned up.
    
    As for the compaction, I came up with something very similar for compaction but here is the funny thing and my problem. The query you wrote will run properly on Derby but is invalid on MySql (you cannot create a subquery for the same table as you are modifying). In mysql you need to create an inner join, but that will not work in Derby :P 
    
    In any case I have made a prototype of this on: https://github.com/gyfora/flink/tree/compaction
    The user can define the frequency of compaction (compact every so many checkpoints). And it also makes sure that compaction and cleanup is only executed on 1 subtask to avoid double work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-151884357
  
    Wow, a lot of stuff. I will look into it once the release is out. :smiley: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Out-of-core state backend for JDBC databases

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-152948336
  
    Thanks for the great write up!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---