You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by re...@apache.org on 2013/12/09 15:22:50 UTC
svn commit: r1549579 -
/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java
Author: reschke
Date: Mon Dec 9 14:22:50 2013
New Revision: 1549579
URL: http://svn.apache.org/r1549579
Log:
OAK-1266 - work in progress SQL/JDBC DocumentStore implementation - add modCount support & check
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java?rev=1549579&r1=1549578&r2=1549579&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/sqlpersistence/SQLDocumentStore.java Mon Dec 9 14:22:50 2013
@@ -46,6 +46,8 @@ import org.apache.jackrabbit.oak.plugins
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SQLDocumentStore implements DocumentStore {
@@ -153,6 +155,8 @@ public class SQLDocumentStore implements
// implementation
+ private static final Logger LOG = LoggerFactory.getLogger(SQLDocumentStore.class);
+
private final Comparator<Revision> comparator = Collections.reverseOrder(new StableRevisionComparator());
private Connection connection;
@@ -166,8 +170,8 @@ public class SQLDocumentStore implements
// memory document store
stmt.execute("drop table if exists CLUSTERNODES");
stmt.execute("drop table if exists NODES");
- stmt.execute("create table if not exists CLUSTERNODES(ID varchar primary key, MODIFIED bigint, DATA varchar)");
- stmt.execute("create table if not exists NODES(ID varchar primary key, MODIFIED bigint, DATA varchar)");
+ stmt.execute("create table if not exists CLUSTERNODES(ID varchar primary key, MODIFIED bigint, MODCOUNT bigint, DATA varchar)");
+ stmt.execute("create table if not exists NODES(ID varchar primary key, MODIFIED bigint, MODCOUNT bigint, DATA varchar)");
}
@CheckForNull
@@ -175,8 +179,9 @@ public class SQLDocumentStore implements
try {
for (UpdateOp update : updates) {
T doc = collection.newDocument(this);
+ update.increment("_modCount", 1);
MemoryDocumentStore.applyChanges(doc, update, comparator);
- writeDocument(collection, doc, true);
+ writeDocument(collection, doc, null, true);
}
// FIXME to be atomic
return true;
@@ -200,15 +205,16 @@ public class SQLDocumentStore implements
if (checkConditions && !MemoryDocumentStore.checkConditions(doc, update)) {
return null;
}
+ update.increment("_modCount", 1);
MemoryDocumentStore.applyChanges(doc, update, comparator);
- writeDocument(collection, doc, oldDoc == null);
+ writeDocument(collection, doc, oldDoc != null ? (Long) oldDoc.get("_modCount") : null, oldDoc == null);
doc.seal();
return oldDoc;
}
@CheckForNull
- private <T extends Document> void internalUpdate(Collection<T> collection, List<String> ids, UpdateOp updateOp) {
+ private <T extends Document> void internalUpdate(Collection<T> collection, List<String> ids, UpdateOp update) {
String tableName = getTable(collection);
try {
for (String id : ids) {
@@ -217,9 +223,13 @@ public class SQLDocumentStore implements
throw new MicroKernelException(tableName + " " + id + " not found");
}
T doc = fromString(collection, in);
- MemoryDocumentStore.applyChanges(doc, updateOp, comparator);
+ Long oldmodcount = (Long) doc.get("_modCount");
+ update.increment("_modCount", 1);
+ MemoryDocumentStore.applyChanges(doc, update, comparator);
String data = asString(doc);
- dbUpdate(connection, tableName, id, (Long) doc.get("_modified"), data);
+ Long modified = (Long) doc.get("_modified");
+ Long modcount = (Long) doc.get("_modCount");
+ dbUpdate(connection, tableName, id, modified, modcount, oldmodcount, data);
}
connection.commit();
} catch (Exception ex) {
@@ -318,14 +328,16 @@ public class SQLDocumentStore implements
}
}
- private <T extends Document> void writeDocument(Collection<T> collection, T document, boolean insert) {
+ private <T extends Document> void writeDocument(Collection<T> collection, T document, Long oldmodcount, boolean insert) {
String tableName = getTable(collection);
try {
String data = asString(document);
+ Long modified = (Long) document.get("_modified");
+ Long modcount = (Long) document.get("_modCount");
if (insert) {
- dbInsert(connection, tableName, document.getId(), (Long) document.get("_modified"), data);
+ dbInsert(connection, tableName, document.getId(), modified, modcount, data);
} else {
- dbUpdate(connection, tableName, document.getId(), (Long) document.get("_modified"), data);
+ dbUpdate(connection, tableName, document.getId(), modified, modcount, oldmodcount, data);
}
connection.commit();
} catch (SQLException ex) {
@@ -376,24 +388,40 @@ public class SQLDocumentStore implements
return result;
}
- private void dbUpdate(Connection connection, String tableName, String id, Long modified, String data) throws SQLException {
- PreparedStatement stmt = connection.prepareStatement("UPDATE " + tableName + " SET MODIFIED = ?, DATA = ? WHERE ID = ?");
+ private void dbUpdate(Connection connection, String tableName, String id, Long modified, Long modcount, Long oldmodcount,
+ String data) throws SQLException {
+ String t = "update " + tableName + " set MODIFIED = ?, MODCOUNT = ?, DATA = ? where ID = ?";
+ if (oldmodcount != null) {
+ t += " and MODCOUNT = ?";
+ }
+ PreparedStatement stmt = connection.prepareStatement(t);
try {
stmt.setObject(1, modified, Types.BIGINT);
- stmt.setString(2, data);
- stmt.setString(3, id);
- stmt.executeUpdate();
+ stmt.setObject(2, modcount, Types.BIGINT);
+ stmt.setString(3, data);
+ stmt.setString(4, id);
+ if (oldmodcount != null) {
+ stmt.setObject(5, oldmodcount, Types.BIGINT);
+ }
+ int result = stmt.executeUpdate();
+ if (result != 1) {
+ String message = "update failed for key=" + id + ", modCount=" + modcount + ", oldmodcount=" + oldmodcount;
+ LOG.error(message);
+ throw new MicroKernelException(message);
+ }
} finally {
stmt.close();
}
}
- private void dbInsert(Connection connection, String tableName, String id, Long modified, String data) throws SQLException {
- PreparedStatement stmt = connection.prepareStatement("INSERT INTO " + tableName + " VALUES(?, ?, ?)");
+ private void dbInsert(Connection connection, String tableName, String id, Long modified, Long modcount, String data)
+ throws SQLException {
+ PreparedStatement stmt = connection.prepareStatement("insert into " + tableName + " values(?, ?, ?, ?)");
try {
stmt.setString(1, id);
stmt.setObject(2, modified, Types.BIGINT);
- stmt.setString(3, data);
+ stmt.setObject(3, modcount, Types.BIGINT);
+ stmt.setString(4, data);
stmt.executeUpdate();
} finally {
stmt.close();