You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/11/24 10:45:09 UTC

[7/8] flink git commit: [FLINK-2924] [streaming] Use short job id for table names

[FLINK-2924] [streaming] Use short job id for table names


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd8be0b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd8be0b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd8be0b7

Branch: refs/heads/master
Commit: cd8be0b785722eae021ceaca8887025d6b61cf12
Parents: 43b8e57
Author: Gyula Fora <gy...@apache.org>
Authored: Thu Nov 12 09:52:13 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue Nov 24 09:28:39 2015 +0100

----------------------------------------------------------------------
 .../contrib/streaming/state/DbAdapter.java      | 11 +++++++---
 .../contrib/streaming/state/DbStateBackend.java | 21 +++++++++++---------
 .../contrib/streaming/state/DbStateHandle.java  |  2 +-
 .../contrib/streaming/state/MySqlAdapter.java   | 12 +++++------
 .../streaming/state/DbStateBackendTest.java     | 20 +++++++++----------
 .../contrib/streaming/state/DerbyAdapter.java   | 10 +++++-----
 .../runtime/executiongraph/ExecutionVertex.java |  4 ++--
 .../apache/flink/runtime/taskmanager/Task.java  |  4 ++--
 8 files changed, 46 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cd8be0b7/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
index 2b4a911..2162f32 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
@@ -26,6 +26,11 @@ import java.util.List;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 
+/**
+ * Adapter interface for executing different checkpointing related operations on
+ * the underlying database.
+ *
+ */
 public interface DbAdapter extends Serializable {
 
 	/**
@@ -168,9 +173,9 @@ public interface DbAdapter extends Serializable {
 			long checkpointTimestamp, List<Tuple2<byte[], byte[]>> toInsert) throws IOException;
 
 	/**
-	 * Compact the states between two checkpoint timestamp by only keeping the most
-	 * recent.
+	 * Compact the states between two checkpoint timestamp by only keeping the
+	 * most recent.
 	 */
 	void compactKvStates(String kvStateId, Connection con, long lowerTs, long upperTs) throws SQLException;
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd8be0b7/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
index e5b1ad8..dce0df8 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
@@ -147,14 +147,15 @@ public class DbStateBackend extends StateBackend<DbStateBackend> {
 					// We create a unique long id for each handle, but we also
 					// store the checkpoint id and timestamp for bookkeeping
 					long handleId = rnd.nextLong();
+					String jobIdShort = env.getJobID().toShortString();
 
-					dbAdapter.setCheckpointInsertParams(env.getJobID().toString(), insertStatement,
+					dbAdapter.setCheckpointInsertParams(jobIdShort, insertStatement,
 							checkpointID, timestamp, handleId,
 							InstantiationUtil.serializeObject(state));
 
 					insertStatement.executeUpdate();
 
-					return new DbStateHandle<S>(env.getJobID().toString(), checkpointID, timestamp, handleId,
+					return new DbStateHandle<S>(jobIdShort, checkpointID, timestamp, handleId,
 							dbConfig);
 				}
 			}, numSqlRetries, sqlRetrySleep);
@@ -179,7 +180,7 @@ public class DbStateBackend extends StateBackend<DbStateBackend> {
 	public <K, V> LazyDbKvState<K, V> createKvState(int operatorId, String stateName,
 			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) throws IOException {
 		return new LazyDbKvState<K, V>(
-				env.getJobID() + "_" + operatorId + "_" + stateName,
+				stateName + "_" + operatorId + "_" + env.getJobID().toShortString(),
 				env.getIndexInSubtaskGroup() == 0,
 				getConnections(),
 				getConfiguration(),
@@ -194,7 +195,7 @@ public class DbStateBackend extends StateBackend<DbStateBackend> {
 		this.env = env;
 
 		connections = dbConfig.createShardedConnection();
-		
+
 		// We want the most light-weight transaction isolation level as we don't
 		// have conflicting reads/writes. We just want to be able to roll back
 		// batch inserts for k-v snapshots. This requirement might be removed in
@@ -203,13 +204,15 @@ public class DbStateBackend extends StateBackend<DbStateBackend> {
 
 		// If we have a different backend for non-partitioned states we
 		// initialize that, otherwise create tables for storing the checkpoints.
-		// 
-		// Currently all non-partitioned states are written to the first database shard
+		//
+		// Currently all non-partitioned states are written to the first
+		// database shard
 		if (nonPartitionedStateBackend == null) {
 			insertStatement = retry(new Callable<PreparedStatement>() {
 				public PreparedStatement call() throws SQLException {
-					dbAdapter.createCheckpointsTable(env.getJobID().toString(), getConnections().getFirst());
-					return dbAdapter.prepareCheckpointInsert(env.getJobID().toString(), getConnections().getFirst());
+					dbAdapter.createCheckpointsTable(env.getJobID().toShortString(), getConnections().getFirst());
+					return dbAdapter.prepareCheckpointInsert(env.getJobID().toShortString(),
+							getConnections().getFirst());
 				}
 			}, numSqlRetries, sqlRetrySleep);
 		} else {
@@ -237,7 +240,7 @@ public class DbStateBackend extends StateBackend<DbStateBackend> {
 	@Override
 	public void disposeAllStateForCurrentJob() throws Exception {
 		if (nonPartitionedStateBackend == null) {
-			dbAdapter.disposeAllStateForJob(env.getJobID().toString(), connections.getFirst());
+			dbAdapter.disposeAllStateForJob(env.getJobID().toShortString(), connections.getFirst());
 		} else {
 			nonPartitionedStateBackend.disposeAllStateForCurrentJob();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd8be0b7/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
index 05b6f0a..fa300a4 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
@@ -85,4 +85,4 @@ public class DbStateHandle<S> implements Serializable, StateHandle<S> {
 	public S getState(ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
 		return InstantiationUtil.deserializeObject(getBytes(), userCodeClassLoader);
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd8be0b7/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
index c47d6f4..7d3eca0 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
@@ -118,7 +118,7 @@ public class MySqlAdapter implements DbAdapter {
 		validateStateId(stateId);
 		try (Statement smt = con.createStatement()) {
 			smt.executeUpdate(
-					"CREATE TABLE IF NOT EXISTS kvstate_" + stateId
+					"CREATE TABLE IF NOT EXISTS " + stateId
 							+ " ("
 							+ "timestamp bigint, "
 							+ "k varbinary(256), "
@@ -131,7 +131,7 @@ public class MySqlAdapter implements DbAdapter {
 	@Override
 	public String prepareKVCheckpointInsert(String stateId) throws SQLException {
 		validateStateId(stateId);
-		return "INSERT INTO kvstate_" + stateId + " (timestamp, k, v) VALUES (?,?,?) "
+		return "INSERT INTO " + stateId + " (timestamp, k, v) VALUES (?,?,?) "
 				+ "ON DUPLICATE KEY UPDATE v=? ";
 	}
 
@@ -139,7 +139,7 @@ public class MySqlAdapter implements DbAdapter {
 	public String prepareKeyLookup(String stateId) throws SQLException {
 		validateStateId(stateId);
 		return "SELECT v"
-				+ " FROM kvstate_" + stateId
+				+ " FROM " + stateId
 				+ " WHERE k = ?"
 				+ " AND timestamp <= ?"
 				+ " ORDER BY timestamp DESC LIMIT 1";
@@ -165,7 +165,7 @@ public class MySqlAdapter implements DbAdapter {
 			long recoveryTs) throws SQLException {
 		validateStateId(stateId);
 		try (Statement smt = con.createStatement()) {
-			smt.executeUpdate("DELETE FROM kvstate_" + stateId
+			smt.executeUpdate("DELETE FROM " + stateId
 					+ " WHERE timestamp > " + checkpointTs
 					+ " AND timestamp < " + recoveryTs);
 		}
@@ -177,10 +177,10 @@ public class MySqlAdapter implements DbAdapter {
 		validateStateId(stateId);
 
 		try (Statement smt = con.createStatement()) {
-			smt.executeUpdate("DELETE state.* FROM kvstate_" + stateId + " AS state"
+			smt.executeUpdate("DELETE state.* FROM " + stateId + " AS state"
 					+ " JOIN"
 					+ " ("
-					+ " 	SELECT MAX(timestamp) AS maxts, k FROM kvstate_" + stateId
+					+ " 	SELECT MAX(timestamp) AS maxts, k FROM " + stateId
 					+ " 	WHERE timestamp BETWEEN " + lowerId + " AND " + upperId
 					+ " 	GROUP BY k"
 					+ " ) m"

http://git-wip-us.apache.org/repos/asf/flink/blob/cd8be0b7/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
index c2f306f..5f8610e 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
@@ -106,10 +106,10 @@ public class DbStateBackendTest {
 		backend.initializeForJob(env);
 
 		assertNotNull(backend.getConnections());
-		assertTrue(isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toString()));
+		assertTrue(isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));
 
 		backend.disposeAllStateForCurrentJob();
-		assertFalse(isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toString()));
+		assertFalse(isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));
 		backend.close();
 
 		assertTrue(backend.getConnections().getFirst().isClosed());
@@ -139,12 +139,12 @@ public class DbStateBackendTest {
 		assertEquals(state2, handle2.getState(getClass().getClassLoader()));
 		handle2.discardState();
 
-		assertFalse(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toString()));
+		assertFalse(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));
 
 		assertEquals(state3, handle3.getState(getClass().getClassLoader()));
 		handle3.discardState();
 
-		assertTrue(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toString()));
+		assertTrue(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));
 
 		backend.close();
 
@@ -168,7 +168,7 @@ public class DbStateBackendTest {
 			LazyDbKvState<Integer, String> kv = backend.createKvState(1, "state1", IntSerializer.INSTANCE,
 					StringSerializer.INSTANCE, null);
 
-			String tableName = "kvstate_" + env.getJobID() + "_1_state1";
+			String tableName = "state1_1_" + env.getJobID().toShortString();
 			assertTrue(isTableCreated(backend.getConnections().getFirst(), tableName));
 
 			assertEquals(0, kv.size());
@@ -257,7 +257,7 @@ public class DbStateBackendTest {
 
 	@Test
 	public void testCaching() throws Exception {
-		
+
 		List<String> urls = Lists.newArrayList(url1, url2);
 		DbBackendConfig conf = new DbBackendConfig("flink", "flink",
 				urls);
@@ -273,7 +273,7 @@ public class DbStateBackendTest {
 
 		Environment env = new DummyEnvironment("test", 2, 0);
 
-		String tableName = "kvstate_" + env.getJobID() + "_1_state1";
+		String tableName = "state1_1_" + env.getJobID().toShortString();
 		assertFalse(isTableCreated(DriverManager.getConnection(url1, "flink", "flink"), tableName));
 		assertFalse(isTableCreated(DriverManager.getConnection(url2, "flink", "flink"), tableName));
 
@@ -281,10 +281,10 @@ public class DbStateBackendTest {
 
 		LazyDbKvState<Integer, String> kv = backend.createKvState(1, "state1", IntSerializer.INSTANCE,
 				StringSerializer.INSTANCE, "a");
-		
+
 		assertTrue(isTableCreated(DriverManager.getConnection(url1, "flink", "flink"), tableName));
 		assertTrue(isTableCreated(DriverManager.getConnection(url2, "flink", "flink"), tableName));
-		
+
 		Map<Integer, Optional<String>> cache = kv.getStateCache();
 		Map<Integer, Optional<String>> modified = kv.getModified();
 
@@ -432,7 +432,7 @@ public class DbStateBackendTest {
 			return smt.executeQuery().next();
 		}
 	}
-	
+
 	private static String localFileUri(File path) {
 		return path.toURI().toString();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd8be0b7/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
index d4fc838..1f13f4b 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
@@ -73,7 +73,7 @@ public class DerbyAdapter extends MySqlAdapter {
 		validateStateId(stateId);
 		try (Statement smt = con.createStatement()) {
 			smt.executeUpdate(
-					"CREATE TABLE kvstate_" + stateId
+					"CREATE TABLE " + stateId
 							+ " ("
 							+ "timestamp bigint, "
 							+ "k varchar(256) for bit data, "
@@ -96,7 +96,7 @@ public class DerbyAdapter extends MySqlAdapter {
 	@Override
 	public String prepareKeyLookup(String stateId) throws SQLException {
 		validateStateId(stateId);
-		return "SELECT v " + "FROM kvstate_" + stateId
+		return "SELECT v " + "FROM " + stateId
 				+ " WHERE k = ? "
 				+ " AND timestamp <= ?"
 				+ " ORDER BY timestamp DESC";
@@ -108,10 +108,10 @@ public class DerbyAdapter extends MySqlAdapter {
 		validateStateId(stateId);
 
 		try (Statement smt = con.createStatement()) {
-			smt.executeUpdate("DELETE FROM kvstate_" + stateId + " t1"
+			smt.executeUpdate("DELETE FROM " + stateId + " t1"
 					+ " WHERE EXISTS"
 					+ " ("
-					+ " 	SELECT * FROM kvstate_" + stateId + " t2"
+					+ " 	SELECT * FROM " + stateId + " t2"
 					+ " 	WHERE t2.k = t1.k"
 					+ "		AND t2.timestamp > t1.timestamp"
 					+ " 	AND t2.timestamp <=" + upperBound
@@ -123,7 +123,7 @@ public class DerbyAdapter extends MySqlAdapter {
 	@Override
 	public String prepareKVCheckpointInsert(String stateId) throws SQLException {
 		validateStateId(stateId);
-		return "INSERT INTO kvstate_" + stateId + " (timestamp, k, v) VALUES (?,?,?)";
+		return "INSERT INTO " + stateId + " (timestamp, k, v) VALUES (?,?,?)";
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/cd8be0b7/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index cff82c1..fba5652 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -617,7 +617,7 @@ public class ExecutionVertex implements Serializable {
 			ExecutionAttemptID executionId,
 			SimpleSlot targetSlot,
 			SerializedValue<StateHandle<?>> operatorState,
-			long nextCheckpointId) {
+			long recoveryTimestamp) {
 
 		// Produced intermediate results
 		List<ResultPartitionDeploymentDescriptor> producedPartitions = new ArrayList<ResultPartitionDeploymentDescriptor>(resultPartitions.size());
@@ -652,7 +652,7 @@ public class ExecutionVertex implements Serializable {
 				subTaskIndex, getTotalNumberOfParallelSubtasks(), getExecutionGraph().getJobConfiguration(),
 				jobVertex.getJobVertex().getConfiguration(), jobVertex.getJobVertex().getInvokableClassName(),
 				producedPartitions, consumedPartitions, jarFiles, classpaths, targetSlot.getRoot().getSlotNumber(),
-				operatorState, nextCheckpointId);
+				operatorState, recoveryTimestamp);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/cd8be0b7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index bf086bb..ae1c0cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -538,14 +538,14 @@ public class Task implements Runnable {
 
 			// get our private reference onto the stack (be safe against concurrent changes) 
 			SerializedValue<StateHandle<?>> operatorState = this.operatorState;
-			long nextCheckpointId = this.recoveryTs;
+			long recoveryTs = this.recoveryTs;
 
 			if (operatorState != null) {
 				if (invokable instanceof StatefulTask) {
 					try {
 						StateHandle<?> state = operatorState.deserializeValue(userCodeClassLoader);
 						StatefulTask<?> op = (StatefulTask<?>) invokable;
-						StateUtils.setOperatorState(op, state, nextCheckpointId);
+						StateUtils.setOperatorState(op, state, recoveryTs);
 					}
 					catch (Exception e) {
 						throw new RuntimeException("Failed to deserialize state handle and setup initial operator state.", e);