You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/11/24 10:45:09 UTC
[7/8] flink git commit: [FLINK-2924] [streaming] Use short job id for
table names
[FLINK-2924] [streaming] Use short job id for table names
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd8be0b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd8be0b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd8be0b7
Branch: refs/heads/master
Commit: cd8be0b785722eae021ceaca8887025d6b61cf12
Parents: 43b8e57
Author: Gyula Fora <gy...@apache.org>
Authored: Thu Nov 12 09:52:13 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue Nov 24 09:28:39 2015 +0100
----------------------------------------------------------------------
.../contrib/streaming/state/DbAdapter.java | 11 +++++++---
.../contrib/streaming/state/DbStateBackend.java | 21 +++++++++++---------
.../contrib/streaming/state/DbStateHandle.java | 2 +-
.../contrib/streaming/state/MySqlAdapter.java | 12 +++++------
.../streaming/state/DbStateBackendTest.java | 20 +++++++++----------
.../contrib/streaming/state/DerbyAdapter.java | 10 +++++-----
.../runtime/executiongraph/ExecutionVertex.java | 4 ++--
.../apache/flink/runtime/taskmanager/Task.java | 4 ++--
8 files changed, 46 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cd8be0b7/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
index 2b4a911..2162f32 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
@@ -26,6 +26,11 @@ import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
+/**
+ * Adapter interface for executing different checkpointing related operations on
+ * the underlying database.
+ *
+ */
public interface DbAdapter extends Serializable {
/**
@@ -168,9 +173,9 @@ public interface DbAdapter extends Serializable {
long checkpointTimestamp, List<Tuple2<byte[], byte[]>> toInsert) throws IOException;
/**
- * Compact the states between two checkpoint timestamp by only keeping the most
- * recent.
+ * Compact the states between two checkpoint timestamp by only keeping the
+ * most recent.
*/
void compactKvStates(String kvStateId, Connection con, long lowerTs, long upperTs) throws SQLException;
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/cd8be0b7/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
index e5b1ad8..dce0df8 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
@@ -147,14 +147,15 @@ public class DbStateBackend extends StateBackend<DbStateBackend> {
// We create a unique long id for each handle, but we also
// store the checkpoint id and timestamp for bookkeeping
long handleId = rnd.nextLong();
+ String jobIdShort = env.getJobID().toShortString();
- dbAdapter.setCheckpointInsertParams(env.getJobID().toString(), insertStatement,
+ dbAdapter.setCheckpointInsertParams(jobIdShort, insertStatement,
checkpointID, timestamp, handleId,
InstantiationUtil.serializeObject(state));
insertStatement.executeUpdate();
- return new DbStateHandle<S>(env.getJobID().toString(), checkpointID, timestamp, handleId,
+ return new DbStateHandle<S>(jobIdShort, checkpointID, timestamp, handleId,
dbConfig);
}
}, numSqlRetries, sqlRetrySleep);
@@ -179,7 +180,7 @@ public class DbStateBackend extends StateBackend<DbStateBackend> {
public <K, V> LazyDbKvState<K, V> createKvState(int operatorId, String stateName,
TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) throws IOException {
return new LazyDbKvState<K, V>(
- env.getJobID() + "_" + operatorId + "_" + stateName,
+ stateName + "_" + operatorId + "_" + env.getJobID().toShortString(),
env.getIndexInSubtaskGroup() == 0,
getConnections(),
getConfiguration(),
@@ -194,7 +195,7 @@ public class DbStateBackend extends StateBackend<DbStateBackend> {
this.env = env;
connections = dbConfig.createShardedConnection();
-
+
// We want the most light-weight transaction isolation level as we don't
// have conflicting reads/writes. We just want to be able to roll back
// batch inserts for k-v snapshots. This requirement might be removed in
@@ -203,13 +204,15 @@ public class DbStateBackend extends StateBackend<DbStateBackend> {
// If we have a different backend for non-partitioned states we
// initialize that, otherwise create tables for storing the checkpoints.
- //
- // Currently all non-partitioned states are written to the first database shard
+ //
+ // Currently all non-partitioned states are written to the first
+ // database shard
if (nonPartitionedStateBackend == null) {
insertStatement = retry(new Callable<PreparedStatement>() {
public PreparedStatement call() throws SQLException {
- dbAdapter.createCheckpointsTable(env.getJobID().toString(), getConnections().getFirst());
- return dbAdapter.prepareCheckpointInsert(env.getJobID().toString(), getConnections().getFirst());
+ dbAdapter.createCheckpointsTable(env.getJobID().toShortString(), getConnections().getFirst());
+ return dbAdapter.prepareCheckpointInsert(env.getJobID().toShortString(),
+ getConnections().getFirst());
}
}, numSqlRetries, sqlRetrySleep);
} else {
@@ -237,7 +240,7 @@ public class DbStateBackend extends StateBackend<DbStateBackend> {
@Override
public void disposeAllStateForCurrentJob() throws Exception {
if (nonPartitionedStateBackend == null) {
- dbAdapter.disposeAllStateForJob(env.getJobID().toString(), connections.getFirst());
+ dbAdapter.disposeAllStateForJob(env.getJobID().toShortString(), connections.getFirst());
} else {
nonPartitionedStateBackend.disposeAllStateForCurrentJob();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cd8be0b7/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
index 05b6f0a..fa300a4 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
@@ -85,4 +85,4 @@ public class DbStateHandle<S> implements Serializable, StateHandle<S> {
public S getState(ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
return InstantiationUtil.deserializeObject(getBytes(), userCodeClassLoader);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/cd8be0b7/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
index c47d6f4..7d3eca0 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
@@ -118,7 +118,7 @@ public class MySqlAdapter implements DbAdapter {
validateStateId(stateId);
try (Statement smt = con.createStatement()) {
smt.executeUpdate(
- "CREATE TABLE IF NOT EXISTS kvstate_" + stateId
+ "CREATE TABLE IF NOT EXISTS " + stateId
+ " ("
+ "timestamp bigint, "
+ "k varbinary(256), "
@@ -131,7 +131,7 @@ public class MySqlAdapter implements DbAdapter {
@Override
public String prepareKVCheckpointInsert(String stateId) throws SQLException {
validateStateId(stateId);
- return "INSERT INTO kvstate_" + stateId + " (timestamp, k, v) VALUES (?,?,?) "
+ return "INSERT INTO " + stateId + " (timestamp, k, v) VALUES (?,?,?) "
+ "ON DUPLICATE KEY UPDATE v=? ";
}
@@ -139,7 +139,7 @@ public class MySqlAdapter implements DbAdapter {
public String prepareKeyLookup(String stateId) throws SQLException {
validateStateId(stateId);
return "SELECT v"
- + " FROM kvstate_" + stateId
+ + " FROM " + stateId
+ " WHERE k = ?"
+ " AND timestamp <= ?"
+ " ORDER BY timestamp DESC LIMIT 1";
@@ -165,7 +165,7 @@ public class MySqlAdapter implements DbAdapter {
long recoveryTs) throws SQLException {
validateStateId(stateId);
try (Statement smt = con.createStatement()) {
- smt.executeUpdate("DELETE FROM kvstate_" + stateId
+ smt.executeUpdate("DELETE FROM " + stateId
+ " WHERE timestamp > " + checkpointTs
+ " AND timestamp < " + recoveryTs);
}
@@ -177,10 +177,10 @@ public class MySqlAdapter implements DbAdapter {
validateStateId(stateId);
try (Statement smt = con.createStatement()) {
- smt.executeUpdate("DELETE state.* FROM kvstate_" + stateId + " AS state"
+ smt.executeUpdate("DELETE state.* FROM " + stateId + " AS state"
+ " JOIN"
+ " ("
- + " SELECT MAX(timestamp) AS maxts, k FROM kvstate_" + stateId
+ + " SELECT MAX(timestamp) AS maxts, k FROM " + stateId
+ " WHERE timestamp BETWEEN " + lowerId + " AND " + upperId
+ " GROUP BY k"
+ " ) m"
http://git-wip-us.apache.org/repos/asf/flink/blob/cd8be0b7/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
index c2f306f..5f8610e 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
@@ -106,10 +106,10 @@ public class DbStateBackendTest {
backend.initializeForJob(env);
assertNotNull(backend.getConnections());
- assertTrue(isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toString()));
+ assertTrue(isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));
backend.disposeAllStateForCurrentJob();
- assertFalse(isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toString()));
+ assertFalse(isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));
backend.close();
assertTrue(backend.getConnections().getFirst().isClosed());
@@ -139,12 +139,12 @@ public class DbStateBackendTest {
assertEquals(state2, handle2.getState(getClass().getClassLoader()));
handle2.discardState();
- assertFalse(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toString()));
+ assertFalse(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));
assertEquals(state3, handle3.getState(getClass().getClassLoader()));
handle3.discardState();
- assertTrue(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toString()));
+ assertTrue(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));
backend.close();
@@ -168,7 +168,7 @@ public class DbStateBackendTest {
LazyDbKvState<Integer, String> kv = backend.createKvState(1, "state1", IntSerializer.INSTANCE,
StringSerializer.INSTANCE, null);
- String tableName = "kvstate_" + env.getJobID() + "_1_state1";
+ String tableName = "state1_1_" + env.getJobID().toShortString();
assertTrue(isTableCreated(backend.getConnections().getFirst(), tableName));
assertEquals(0, kv.size());
@@ -257,7 +257,7 @@ public class DbStateBackendTest {
@Test
public void testCaching() throws Exception {
-
+
List<String> urls = Lists.newArrayList(url1, url2);
DbBackendConfig conf = new DbBackendConfig("flink", "flink",
urls);
@@ -273,7 +273,7 @@ public class DbStateBackendTest {
Environment env = new DummyEnvironment("test", 2, 0);
- String tableName = "kvstate_" + env.getJobID() + "_1_state1";
+ String tableName = "state1_1_" + env.getJobID().toShortString();
assertFalse(isTableCreated(DriverManager.getConnection(url1, "flink", "flink"), tableName));
assertFalse(isTableCreated(DriverManager.getConnection(url2, "flink", "flink"), tableName));
@@ -281,10 +281,10 @@ public class DbStateBackendTest {
LazyDbKvState<Integer, String> kv = backend.createKvState(1, "state1", IntSerializer.INSTANCE,
StringSerializer.INSTANCE, "a");
-
+
assertTrue(isTableCreated(DriverManager.getConnection(url1, "flink", "flink"), tableName));
assertTrue(isTableCreated(DriverManager.getConnection(url2, "flink", "flink"), tableName));
-
+
Map<Integer, Optional<String>> cache = kv.getStateCache();
Map<Integer, Optional<String>> modified = kv.getModified();
@@ -432,7 +432,7 @@ public class DbStateBackendTest {
return smt.executeQuery().next();
}
}
-
+
private static String localFileUri(File path) {
return path.toURI().toString();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cd8be0b7/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
index d4fc838..1f13f4b 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
@@ -73,7 +73,7 @@ public class DerbyAdapter extends MySqlAdapter {
validateStateId(stateId);
try (Statement smt = con.createStatement()) {
smt.executeUpdate(
- "CREATE TABLE kvstate_" + stateId
+ "CREATE TABLE " + stateId
+ " ("
+ "timestamp bigint, "
+ "k varchar(256) for bit data, "
@@ -96,7 +96,7 @@ public class DerbyAdapter extends MySqlAdapter {
@Override
public String prepareKeyLookup(String stateId) throws SQLException {
validateStateId(stateId);
- return "SELECT v " + "FROM kvstate_" + stateId
+ return "SELECT v " + "FROM " + stateId
+ " WHERE k = ? "
+ " AND timestamp <= ?"
+ " ORDER BY timestamp DESC";
@@ -108,10 +108,10 @@ public class DerbyAdapter extends MySqlAdapter {
validateStateId(stateId);
try (Statement smt = con.createStatement()) {
- smt.executeUpdate("DELETE FROM kvstate_" + stateId + " t1"
+ smt.executeUpdate("DELETE FROM " + stateId + " t1"
+ " WHERE EXISTS"
+ " ("
- + " SELECT * FROM kvstate_" + stateId + " t2"
+ + " SELECT * FROM " + stateId + " t2"
+ " WHERE t2.k = t1.k"
+ " AND t2.timestamp > t1.timestamp"
+ " AND t2.timestamp <=" + upperBound
@@ -123,7 +123,7 @@ public class DerbyAdapter extends MySqlAdapter {
@Override
public String prepareKVCheckpointInsert(String stateId) throws SQLException {
validateStateId(stateId);
- return "INSERT INTO kvstate_" + stateId + " (timestamp, k, v) VALUES (?,?,?)";
+ return "INSERT INTO " + stateId + " (timestamp, k, v) VALUES (?,?,?)";
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/cd8be0b7/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index cff82c1..fba5652 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -617,7 +617,7 @@ public class ExecutionVertex implements Serializable {
ExecutionAttemptID executionId,
SimpleSlot targetSlot,
SerializedValue<StateHandle<?>> operatorState,
- long nextCheckpointId) {
+ long recoveryTimestamp) {
// Produced intermediate results
List<ResultPartitionDeploymentDescriptor> producedPartitions = new ArrayList<ResultPartitionDeploymentDescriptor>(resultPartitions.size());
@@ -652,7 +652,7 @@ public class ExecutionVertex implements Serializable {
subTaskIndex, getTotalNumberOfParallelSubtasks(), getExecutionGraph().getJobConfiguration(),
jobVertex.getJobVertex().getConfiguration(), jobVertex.getJobVertex().getInvokableClassName(),
producedPartitions, consumedPartitions, jarFiles, classpaths, targetSlot.getRoot().getSlotNumber(),
- operatorState, nextCheckpointId);
+ operatorState, recoveryTimestamp);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cd8be0b7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index bf086bb..ae1c0cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -538,14 +538,14 @@ public class Task implements Runnable {
// get our private reference onto the stack (be safe against concurrent changes)
SerializedValue<StateHandle<?>> operatorState = this.operatorState;
- long nextCheckpointId = this.recoveryTs;
+ long recoveryTs = this.recoveryTs;
if (operatorState != null) {
if (invokable instanceof StatefulTask) {
try {
StateHandle<?> state = operatorState.deserializeValue(userCodeClassLoader);
StatefulTask<?> op = (StatefulTask<?>) invokable;
- StateUtils.setOperatorState(op, state, nextCheckpointId);
+ StateUtils.setOperatorState(op, state, recoveryTs);
}
catch (Exception e) {
throw new RuntimeException("Failed to deserialize state handle and setup initial operator state.", e);