You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@labs.apache.org by si...@apache.org on 2011/05/18 14:46:06 UTC
svn commit: r1124229 [2/3] - in /labs/magma/trunk/database-mongodb: ./ src/
src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/
src/main/java/org/apache/magma/ src/main/java/org/apache/magma/database/
src/main/java/org/apache/magma/da...
Added: labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/Transaction.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/Transaction.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/Transaction.java (added)
+++ labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/Transaction.java Wed May 18 12:46:03 2011
@@ -0,0 +1,523 @@
+package org.apache.magma.database.mongo;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.persistence.CascadeType;
+import javax.persistence.OptimisticLockException;
+import javax.persistence.PostPersist;
+import javax.persistence.PostUpdate;
+import javax.persistence.PrePersist;
+import javax.persistence.PreRemove;
+import javax.persistence.PreUpdate;
+
+import org.apache.commons.collections.map.IdentityMap;
+import org.apache.magma.basics.MagmaException;
+import org.apache.magma.basics.startup.Cycle;
+import org.apache.magma.beans.BeanData;
+import org.apache.magma.beans.BeanHandler;
+import org.apache.magma.beans.PropertyInfo;
+import org.apache.magma.database.Database;
+import org.apache.magma.database.InstallIdByDefault.WithDefaultId;
+import org.apache.magma.database.openjpa.InstallVersionByDefault.WithDefaultVersion;
+import org.bson.BSONObject;
+
+import com.mongodb.BasicDBList;
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBObject;
+import com.mongodb.WriteConcern;
+import com.mongodb.WriteResult;
+import com.mongodb.util.JSON;
+
+public class Transaction {
+
+ private String id;
+ private MongoDatabase db;
+
+ private Map<String, MongoEntity> mainCache = new HashMap<String, MongoEntity>();
+ private WriteConcern writeConcern;
+ private List<MongoCursor> cursors = new ArrayList<MongoCursor>();
+ private Set<MongoEntity> confirmedSave = new HashSet<MongoEntity>();
+
+ private boolean fastSecondPhase = true;
+
+ private static MongoEntity NOTFOUND = new MongoEntity() {
+
+ };
+
+
+ public Transaction(MongoDatabase db) {
+ this.db = db;
+ this.id = Long.toHexString(LongObjectId.generate());
+ this.writeConcern = WriteConcern.SAFE;
+ }
+
+
+ public TransactionPart addEntity(MongoEntity entity) {
+ TransactionPart part = entity.getPart();
+ part.setTransaction(this);
+ entity.prepareForDb(this);
+ return part;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public static Transaction getCurrent() {
+ return ((MongoDatabase)new Database()).getTransaction();
+ }
+
+
+
+
+ public <T extends MongoEntity> T checkCache(Class<T> clazz, String id) {
+ T ret = (T) mainCache.get(id);
+ return ret;
+ }
+
+ public void cache(String id, MongoEntity obj) {
+ mainCache.put(id, obj);
+ }
+
+
+ private <T extends MongoEntity> MongoCollection getCollection(Class<T> clazz) {
+ BeanData bd = BeanData.getFor(clazz);
+
+ String tn = bd.getJpaTableName();
+ return getCollection(tn);
+ }
+ private MongoCollection getCollection(String tn) {
+ String dbname = db.getDefdb();
+ if (tn.indexOf('.') > 1) {
+ dbname = tn.substring(0, tn.indexOf('.'));
+ tn = tn.substring(tn.indexOf('.') + 1);
+ }
+ return db.getDb().getCollection(dbname, tn);
+ }
+
+ public <T extends MongoEntity> T load(Class<T> clazz, Object cid) {
+ if (cid == null) return null;
+ String id = null;
+ if (cid instanceof Long && WithDefaultId.class.isAssignableFrom(clazz)) {
+ id = Long.toHexString((Long)cid);
+ } else {
+ id = cid.toString();
+ }
+ MongoCollection coll = getCollection(clazz);
+ DBObject one = null;
+ T ret = checkCache(clazz, id);
+ if (ret == null) {
+ one = coll.findOne(new BasicDBObject("_id", id));
+ if (one == null) {
+ ret = (T)NOTFOUND;
+ } else {
+ // FIXME wait grace period
+ ret = (T)MongoUtils.convertToObject(clazz, one);
+ ret.setFromDb(true);
+ }
+ cache(id, ret);
+ }
+ if (ret == NOTFOUND) {
+ return null;
+ }
+ return ret;
+ }
+
+ // TODO do we need this complexity?
+ // TODO what about polymorphic spread along different collections?
+ public <T extends MongoEntity> List<T> loadBatch(Class<T> clazz, List ids) {
+ MongoCollection coll = getCollection(clazz);
+ final List<T> ret = new ArrayList<T>();
+ final List<String> retids = new ArrayList<String>();
+ final List<String> allids = new ArrayList<String>();
+
+ List<Object> okids = new ArrayList<Object>();
+ for (Object id : ids) {
+ String sid = id.toString();
+ T obj = checkCache(clazz, sid);
+ if (obj == null) {
+ okids.add(sid);
+ } else {
+ retids.add(sid);
+ ret.add(obj);
+ }
+ allids.add(sid);
+ }
+
+ if (okids.size() > 0) {
+ MongoCursor cursor = null;
+ try {
+ cursor = coll.find(new BasicDBObject("_id", new BasicDBObject("$in", okids)));
+ while (cursor.hasNext()) {
+ DBObject dbo = cursor.next();
+ // FIXME wait grace period
+ T obj = (T)MongoUtils.convertToObject(clazz, dbo);
+ addEntity(obj);
+ obj.setFromDb(true);
+ String oid = (String)dbo.get("_id");
+ retids.add(oid);
+ cache(oid, obj);
+ ret.add(obj);
+ }
+ } finally {
+ if (cursor != null) cursor.close();
+ }
+ }
+
+ // resort based on initial
+ List<T> sret = new ArrayList<T>(ret);
+ Collections.sort(sret, new Comparator<T>() {
+ public int compare(T o1, T o2) {
+ int i1 = ret.indexOf(o1);
+ int i2 = ret.indexOf(o2);
+ Object oid1 = retids.get(i1);
+ Object oid2 = retids.get(i2);
+ int idi1 = allids.indexOf(oid1);
+ int idi2 = allids.indexOf(oid2);
+ if (idi1 == idi2) return 0;
+ if (idi1 < idi2) return -1;
+ return 1;
+ }
+ });
+
+ return sret;
+ }
+
+
+ public void save(MongoEntity mpbean) {
+ mpbean.prepareForDb(this);
+ Set<MongoEntity> mains = mpbean.findMainEntities(CascadeType.PERSIST, false);
+ for (MongoEntity entity : mains) {
+ entity.prepareForDb(this);
+ entity.setOnDb(true);
+ addEntity(entity);
+ confirmedSave.add(entity);
+ if (entity.isFromDb()) {
+ entity.fireJpaHandlers(PreUpdate.class);
+ } else {
+ entity.fireJpaHandlers(PrePersist.class);
+ }
+ }
+ }
+
+
+ public void delete(MongoEntity mpbean) {
+ mpbean.prepareForDb(this);
+ Set<MongoEntity> mains = mpbean.findMainEntities(CascadeType.REMOVE, false);
+ for (MongoEntity entity : mains) {
+ entity.prepareForDb(this);
+ entity.setOnDb(false);
+ TransactionPart entpart = addEntity(entity);
+ entpart.deleted();
+ confirmedSave.add(entity);
+ entity.fireJpaHandlers(PreRemove.class);
+ }
+ }
+
+
+ public <T extends MongoEntity> MongoCursor getCursor(Class<T> clazz, String json, Object... args) {
+ MongoCollection coll = getCollection(clazz);
+ DBObject dbo = createQuery(clazz, json, args);
+ DBObject sort = null;
+ if (dbo.containsField("$orderby")) {
+ sort = (DBObject) dbo.get("$orderby");
+ dbo.removeField("$orderby");
+ }
+ MongoCursor find = null;
+ if (dbo != null) {
+ find = coll.find(dbo);
+ } else {
+ find = coll.find();
+ }
+ if (sort != null) {
+ find.sort(sort);
+ }
+ this.cursors.add(find);
+ return find;
+ }
+
+ public <T extends MongoEntity> int count(Class<T> clazz, String json, Object... args) {
+ MongoCollection coll = getCollection(clazz);
+ DBObject dbo = createQuery(clazz, json, args);
+ return coll.count(dbo);
+ }
+
+ private <T> DBObject createQuery(Class<T> clazz, String json,
+ Object... args) {
+ DBObject dbo = convertToQuery(json, args);
+ if (dbo == null) dbo = new BasicDBObject();
+ MongoMetadata meta = getMeta();
+ List<String> clnames = meta.convertToNames(meta.findSubs(clazz));
+ if (clnames.size() == 1) {
+ dbo.put("_jcl", clnames.get(0));
+ } else {
+ BasicDBList jclq = new BasicDBList();
+ jclq.addAll(clnames);
+ dbo.put("_jcl", new BasicDBObject("$in", jclq));
+ }
+ return dbo;
+ }
+
+
+
+
+ public DBObject convertToQuery(String json, Object... args) {
+ if (json == null || json.length() == 0) {
+ return new BasicDBObject();
+ } else {
+ DBObject dbo = (DBObject) JSON.parse(json);
+ recurseReplace(dbo, args);
+ return dbo;
+ }
+ }
+
+ private void recurseReplace(DBObject dbo, Object[] args) {
+ if (dbo == null) return;
+ Set<String> ks = dbo.keySet();
+ for (String kn : ks) {
+ Object ob = dbo.get(kn);
+ if (ob instanceof DBObject) {
+ recurseReplace((DBObject)ob, args);
+ } else if (ob instanceof String) {
+ int i = checkIndex((String)ob);
+ if (i >= 0) {
+ dbo.put(kn, MongoUtils.convertToMongo(args[i]));
+ }
+ } else if (ob instanceof List) {
+ List l = (List)ob;
+ for (int li = 0; li < l.size(); li++) {
+ Object val = l.get(li);
+ if (val instanceof String) {
+ int i = checkIndex((String)ob);
+ if (i >= 0) {
+ l.set(li, MongoUtils.convertToMongo(args[i]));
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private int checkIndex(String ob) {
+ if (ob.startsWith("$")) {
+ try {
+ return Integer.parseInt(ob.substring(1)) - 1;
+ } catch (NumberFormatException e) {
+ return -1;
+ }
+ }
+ return -1;
+ }
+
+ private void checkResult(WriteResult wr, boolean insert, MongoEntity entity) {
+ if (wr.getError() != null) {
+ throw new MagmaException("Error {0} while saving {1}", wr.toString(), entity != null ? entity : "transaction");
+ }
+ if (insert) return;
+ if (wr.getN() == 0) {
+ if (entity != null) {
+ throw new OptimisticLockException(entity);
+ }
+ throw new MagmaException("Server returned 0 inserts while saving {0}", entity != null ? entity : "transaction");
+ }
+ }
+
+ /**
+ * This methods commit the transaction. Actually this is a two phase commit, so this
+ * method merely adds the transaction part connected to each bean to it's mongodb document.
+ *
+ * For entities that does not have a document already, a new document is created with only
+ * the transaction part in it, this is done with upserts.
+ *
+ * Optmistic locking is checked during update, if everything goes on well, the transaction
+ * itself is saved in a special collection for the second phase of the commit to work properly.
+ * If an optimistic lock is detected, the uncompleted transaction is left hanging, and will be
+ * garbage collected later by the second phase.
+ */
+ public void commit() {
+ MongoDB mdb = this.db.getDb();
+ Set<MongoEntity> reallySaving = new HashSet<MongoEntity>();
+ BasicDBObject entlist = new BasicDBObject();
+ for (MongoEntity ent : this.confirmedSave) {
+ if (!ent.isDirty()) continue;
+ TransactionPart part = ent.getPart();
+ if (part == null) continue;
+ MongoCollection coll = getCollection(ent.getClass());
+ String id = MongoUtils.getMongoId(ent);
+
+ reallySaving.add(ent);
+
+ BasicDBList colllist = (BasicDBList) entlist.get(coll.getName());
+ if (colllist == null) {
+ colllist = new BasicDBList();
+ entlist.put(coll.getName(), colllist);
+ }
+ colllist.add(id);
+ }
+ if (reallySaving.size() == 0) return;
+
+ MongoCollection trcoll = getCollection("transactions");
+ BasicDBObject mydbo = new BasicDBObject();
+ mydbo.put("_id", this.id);
+ // XXX should use a valid (global) time source here
+ mydbo.put("ts", System.currentTimeMillis());
+ mydbo.put("state", "R");
+ mydbo.put("entities", entlist);
+ this.checkResult(trcoll.insert(mydbo, writeConcern), true, null);
+
+ try {
+ for (MongoEntity ent : reallySaving) {
+ TransactionPart part = ent.getPart();
+ MongoCollection coll = getCollection(ent.getClass());
+ String id = MongoUtils.getMongoId(ent);
+ WriteResult wr = null;
+
+ BasicDBObject search = new BasicDBObject("_id", id);
+ if (ent.isFromDb()) {
+ BasicDBObject update = new BasicDBObject(
+ "$push",
+ new BasicDBObject(
+ "__transactions", part));
+
+
+ long nversion = -1;
+ // Check for version field for optimistic lock
+ BeanData bd = ent.beanData();
+ BeanHandler bh = ent.handler();
+ PropertyInfo vf = bd.getJpaVersionField();
+ if (ent instanceof WithDefaultVersion) {
+ long version = ((WithDefaultVersion)ent).getVersion();
+ if (version > 0) search.append("version", version);
+ ((WithDefaultVersion)ent).setVersion(version + 1);
+ // We need this because setVersion is not weaved by MongoEntity Impls cause of LTW bugs
+ ent.checkGetBson().put("version", version + 1);
+ nversion = version + 1;
+ } else {
+ long version = 0;
+ if (vf != null) {
+ version = 1;
+ try {
+ version = ((Number)bh.getValue(vf.getName())).longValue();
+ } catch (NullPointerException e) {}
+ search.append(vf.getJpaColName(), version);
+ bh.setValue(vf.getName(), version + 1);
+ nversion = version + 1;
+ // We need this because setVersion is not weaved by MongoPersistedImpl casue of LTW bugs
+ ent.checkGetBson().put(vf.getJpaColName(), version + 1);
+ bh.commit();
+ }
+ }
+ update.put("$set", new BasicDBObject("version", nversion));
+
+ this.checkResult(coll.update(search, update, false, false, writeConcern), false, ent);
+ } else {
+ BasicDBObject insert = new BasicDBObject();
+ part.populateBasics(ent);
+ insert.put("version", 1);
+ BasicDBList trlist = new BasicDBList();
+ trlist.add(part);
+ insert.put("__transactions", trlist);
+ insert.put("_id", MongoUtils.getMongoId(ent));
+
+ this.checkResult(coll.insert(insert, writeConcern), true, ent);
+ }
+
+ }
+ } catch (Exception e) {
+ try {
+ mydbo.put("state", "F");
+ this.checkResult(trcoll.update(new BasicDBObject("_id", this.id), mydbo, false, false, writeConcern), false, null);
+ } catch (Throwable e1) {
+ }
+ if (e instanceof RuntimeException)
+ throw (RuntimeException)e;
+ throw new MagmaException(e, "Error while persisting to MongoDB");
+ }
+
+ // All entities updated, save the transaction itself
+ mydbo.put("state", "D");
+ mydbo.put("cstate", "C");
+ {
+ BasicDBObject set = new BasicDBObject("state", "D");
+ set.put("cstate", "D");
+ this.checkResult(
+ trcoll.update(
+ new BasicDBObject("_id", this.id),
+ new BasicDBObject("$set", set),
+ false, false, writeConcern)
+ , false, null);
+ }
+ if (this.isFastSecondPhase()) {
+ try {
+ TransactionSecondPhaseThread tspt = new TransactionSecondPhaseThread(this.db.getDb(), this.db.getDefdb());
+ tspt.fastHandle(mydbo);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+
+ this.confirmedSave.removeAll(reallySaving);
+ for (MongoEntity ent : reallySaving) {
+ ent.clearDirty();
+ if (ent.isFromDb()) {
+ ent.fireJpaHandlers(PostUpdate.class);
+ } else {
+ ent.fireJpaHandlers(PostPersist.class);
+ ent.setFromDb(true);
+ }
+ }
+ }
+
+ void replayValids(MongoEntity ent) {
+ BSONObject doc = ent.checkGetBson();
+ Object trlistObj = doc.get("__transactions");
+ if (trlistObj == null) return;
+ if (!(trlistObj instanceof List)) throw new MagmaException("Expected a list in __transactions in {0}/{1}", ent.getClass(), MongoUtils.getMongoId(ent));
+
+ MongoCollection coll = getCollection("transactions");
+ DBObject fields = new BasicDBObject("state", 1);
+ List trlist = (List) trlistObj;
+ for (Object object : trlist) {
+ if (!(object instanceof BSONObject)) throw new MagmaException("Expected a bson object inside __transactions in {0}/{1}", ent.getClass(), MongoUtils.getMongoId(ent));
+ BSONObject part = (BSONObject) object;
+ TransactionPart tp = new TransactionPart(part);
+ DBObject trans = coll.findOne(new BasicDBObject("_id", tp.getTransactionId()), fields);
+ // Unfinished or dead transaction
+ if (trans == null || trans.get("state") == null || !trans.get("state").equals("D"))
+ continue;
+ tp.reapply(doc);
+ }
+ }
+
+
+ public void clear() {
+ this.confirmedSave.clear();
+ for (MongoCursor cursor : this.cursors) {
+ cursor.close();
+ }
+ this.cursors.clear();
+ this.mainCache.clear();
+ }
+
+ public MongoMetadata getMeta() {
+ return this.db.getMeta();
+ }
+
+
+ public boolean isFastSecondPhase() {
+ return fastSecondPhase;
+ }
+ public void setFastSecondPhase(boolean fastSecondPhase) {
+ this.fastSecondPhase = fastSecondPhase;
+ }
+
+
+}
Added: labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/TransactionPart.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/TransactionPart.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/TransactionPart.java (added)
+++ labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/TransactionPart.java Wed May 18 12:46:03 2011
@@ -0,0 +1,84 @@
+package org.apache.magma.database.mongo;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.magma.beans.BeanData;
+import org.apache.magma.beans.BeanHandler;
+import org.apache.magma.beans.PropertyInfo;
+import org.bson.BSONObject;
+import org.junit.runner.manipulation.Sortable;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBObject;
+
+public class TransactionPart extends BasicDBObject implements Comparable<TransactionPart> {
+
+ public TransactionPart() {
+ }
+
+ public TransactionPart(BSONObject other) {
+ this.putAll(other);
+ }
+
+ public void setTransaction(Transaction t) {
+ if (getTransactionId() != null) {
+ if (getTransactionId().equals(t.getId())) return;
+ if (this.keySet().size() > 1) {
+ throw new IllegalStateException("Moving modified entity between two transactions");
+ }
+ }
+ this.put("__transactionId", t.getId());
+ }
+
+ public void unset(PropertyInfo prop) {
+ this.put(prop.getJpaColName(), null);
+ }
+
+ public void set(PropertyInfo prop, Object value) {
+ if (!(value instanceof DBObject)) {
+ value = MongoUtils.convertToMongo(value);
+ }
+ this.put(prop.getJpaColName(), value);
+ }
+
+ public String getTransactionId() {
+ return this.getString("__transactionId");
+ }
+ public void deleted() {
+ this.put("__deleted", true);
+ }
+ public boolean isDeleted() {
+ return this.getBoolean("__deleted", false);
+ }
+ public void populateBasics(MongoEntity ent) {
+ BSONObject bson = ent.checkGetBson();
+ this.put("_jcl", bson.get("_jcl"));
+ BeanData db = ent.beanData();
+ BeanHandler handler = ent.handler();
+ Set<String> names = db.getPropertyNames();
+ for (String propname : names) {
+ PropertyInfo pi = db.getProperty(propname);
+ if (!pi.isBasicType()) continue;
+ if (!pi.isWriteable() || pi.isJpaTransient()) continue;
+ if (pi.isJpaId() || pi.isJpaVersion()) continue;
+ Object val = handler.getValue(propname);
+ ent.dirtyModified(pi, val);
+ }
+ }
+ public void reapply(BSONObject doc) {
+ for (String key : this.keySet()) {
+ if (key.startsWith("__")) continue;
+ Object object = this.get(key);
+ if (object == null) {
+ doc.removeField(key);
+ } else {
+ doc.put(key, object);
+ }
+ }
+ }
+ public int compareTo(TransactionPart o) {
+ return this.getTransactionId().compareTo(o.getTransactionId());
+ }
+
+}
Added: labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/TransactionSecondPhaseThread.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/TransactionSecondPhaseThread.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/TransactionSecondPhaseThread.java (added)
+++ labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/TransactionSecondPhaseThread.java Wed May 18 12:46:03 2011
@@ -0,0 +1,514 @@
+package org.apache.magma.database.mongo;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.commons.beanutils.converters.StringArrayConverter;
+import org.bson.BSONObject;
+
+import com.mongodb.BasicDBList;
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBObject;
+import com.mongodb.Mongo;
+import com.mongodb.ServerAddress;
+import com.mongodb.WriteConcern;
+import com.mongodb.WriteResult;
+
+public class TransactionSecondPhaseThread implements Runnable {
+
+ private MongoDB db;
+ private String dbname;
+ private WriteConcern concern = WriteConcern.SAFE;
+ private long sleepPeriod = 500;
+ private boolean backupTransactions = false;
+
+ public TransactionSecondPhaseThread(MongoDB mdb, String dbname) {
+ this.db = mdb;
+ this.dbname = dbname;
+ }
+
+ public void setSleepPeriod(long sleepPeriod) {
+ this.sleepPeriod = sleepPeriod;
+ }
+
+ public void setBackupTransactions(boolean backupTransactions) {
+ this.backupTransactions = backupTransactions;
+ }
+
+ public void run() {
+ long lastreport = 0;
+ long reportevery = 3000;
+ while (true) {
+ try {
+ poll();
+ if (System.currentTimeMillis() > lastreport + reportevery) {
+ System.out.println(logger);
+ logger.clear();
+ lastreport = System.currentTimeMillis();
+ }
+ if (logger.waiting > 0) continue;
+ Thread.sleep(sleepPeriod);
+ } catch (Exception e) {
+ // XXX log it better
+ e.printStackTrace();
+ try {
+ Thread.sleep(sleepPeriod);
+ } catch (InterruptedException e1) {
+ }
+ }
+ }
+ }
+
+ public static class Logger {
+ public int running = 0;
+ public int committing = 0;
+ public int done = 0;
+ public int waiting = 0;
+ public List<String> errors = null;
+ public List<String> warnings = null;
+ public int entitiesDone;
+ public int entitiesSkip;
+ public int backedUp = 0;
+ public int faileds = 0;
+
+ public void clear() {
+ running = 0;
+ committing = 0;
+ done = 0;
+ waiting = 0;
+ errors = null;
+ warnings = null;
+ entitiesDone = 0;
+ entitiesSkip = 0;
+ backedUp = 0;
+ faileds = 0;
+ }
+
+ public void error(String err) {
+ if (errors == null) errors = new ArrayList<String>();
+ errors.add(err);
+ }
+ public void warning(String err) {
+ if (warnings == null) warnings = new ArrayList<String>();
+ warnings.add(err);
+ }
+ public void error(String id, String err) {
+ this.error(id + " - " + err);
+ }
+ public void warning(String id, String err) {
+ this.warning(id + " - " + err);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Round results:\n");
+ sb.append(" running transactions: " + this.running);
+ sb.append("\n");
+ sb.append(" committing transactions: " + this.committing);
+ sb.append("\n");
+ sb.append(" done transactions: " + this.done);
+ sb.append("\n");
+ sb.append(" faileds: " + this.faileds);
+ sb.append("\n");
+ sb.append(" backed up: " + this.backedUp);
+ sb.append("\n");
+ sb.append(" waited transactions: " + this.waiting);
+ sb.append("\n");
+ sb.append(" done entities: " + this.entitiesDone);
+ sb.append("\n");
+ sb.append(" skipped entities: " + this.entitiesSkip);
+ sb.append("\n");
+ if (this.warnings != null) {
+ sb.append(" warnings :\n");
+ for (String err : this.warnings) {
+ sb.append(err);
+ sb.append("\n");
+ }
+ }
+ if (this.errors != null) {
+ sb.append(" errors :\n");
+ for (String err : this.errors) {
+ sb.append(err);
+ sb.append("\n");
+ }
+ }
+ return sb.toString();
+ }
+ }
+
+ private Logger logger = new Logger();
+
+ public Logger getLogger() {
+ return logger;
+ }
+ public void setLogger(Logger logger) {
+ this.logger = logger;
+ }
+
+
+ class TransactionState {
+ private DBObject dbo;
+ private long localTs;
+
+ TransactionState(DBObject dbo) {
+ this.dbo = dbo;
+ localTs = System.currentTimeMillis();
+ }
+ String getId() {
+ return (String) this.dbo.get("_id");
+ }
+ private String getState() {
+ String ret = (String)this.dbo.get("state");
+ if (ret == null) return "U";
+ return ret;
+ }
+ private String getSecondState() {
+ String ret = (String)this.dbo.get("cstate");
+ return ret;
+ }
+ boolean isDone() {
+ return !isLocallyExpired() && getState().equals("D");
+ }
+ boolean isRunning() {
+ return !isLocallyExpired() && getState().equals("R");
+ }
+ boolean isFailed() {
+ return !isLocallyExpired() && getState().equals("F");
+ }
+ boolean isMissing() {
+ return !isLocallyExpired() && getState().equals("M");
+ }
+ boolean isCommitting() {
+ return !isLocallyExpired() && getSecondState() != null;
+ }
+ boolean isTimedOut() {
+ if (isLocallyExpired()) return false;
+ Object tsobj = dbo.get("ts");
+ if (tsobj == null) {
+ logger.warning(getId(),"has no timestamp");
+ return false;
+ }
+ if (!(tsobj instanceof Number)) {
+ logger.warning(getId(),"has an invalid timestamp");
+ return false;
+ }
+ long ts = ((Number)tsobj).longValue();
+ // XXX should use a valid (global) time source here
+ return (System.currentTimeMillis() > ts + 60000);
+ }
+ /**
+ * @return a list of {@link DBObject} having coll: collection, id: id-of-entity
+ */
+ DBObject getEntityIds() {
+ Object tridsObj = this.dbo.get("entities");
+ if (tridsObj == null) {
+ logger.error(getId(),"does not have entities property at all!");
+ return null;
+ }
+ if (!(tridsObj instanceof DBObject)) {
+ logger.error(getId(),"has something else than an object in entities property!");
+ return null;
+ }
+ DBObject trids = (DBObject) tridsObj;
+ return trids;
+ }
+ boolean isLocallyExpired() {
+ return System.currentTimeMillis() > localTs + 5000;
+ // FIXME reenable this for debug
+ //return false;
+ }
+ boolean workOnMe() {
+ return markAs("C");
+ }
+
+ void doneWorking() {
+ markAs(null);
+ }
+ void remove() {
+ markAs("D");
+ }
+
+ boolean markAs(String cstate) {
+ BasicDBObject search = new BasicDBObject();
+ search.put("_id", getId());
+ if (getSecondState() == null) {
+ search.put("cstate", new BasicDBObject("$exists", false));
+ } else {
+ search.put("cstate", getSecondState());
+ }
+
+ BasicDBObject upd = new BasicDBObject();
+ BasicDBObject set = new BasicDBObject();
+ if (cstate == null) {
+ upd.put("$unset", new BasicDBObject("cstate", 1));
+ } else {
+ set.put("cstate", cstate);
+ }
+ set.put("entities", getEntityIds());
+ upd.put("$set", set);
+ boolean done = false;
+ try {
+ WriteResult wr = trans.update(search, upd, false, false, concern);
+ done = (wr.getError() == null && wr.getN() == 1);
+ dbo.put("cstate", cstate);
+ } finally {
+ if (!done) {
+ // Expire this, we have stale data
+ localTs = 0;
+ }
+ }
+ return done;
+ }
+
+ public void done(String coll, String entid) {
+ DBObject dbo = getEntityIds();
+ List list = (List) dbo.get("coll");
+ if (list == null) return;
+ list.remove(entid);
+ }
+ }
+
+ private MongoCollection trans;
+ private Map<String, TransactionState> states = new HashMap<String, TransactionState>();
+ private SortedSet<String> stateIds = new TreeSet<String>();
+
+ public void init() {
+ if (logger == null) {
+ logger = new Logger();
+ }
+ if (trans == null) {
+ trans = db.getCollection(dbname, "transactions");
+ }
+ }
+
+ private TransactionState loadTransaction(String id) {
+ TransactionState ret = states.get(id);
+ if (ret != null) {
+ if (!ret.isLocallyExpired()) return ret;
+ for (Iterator<TransactionState> iterator = states.values().iterator(); iterator.hasNext();) {
+ TransactionState state = iterator.next();
+ if (state.isLocallyExpired()) iterator.remove();
+ }
+ }
+ DBObject dbo = trans.findOne(new BasicDBObject("_id", id));
+ if (dbo == null) {
+ dbo = new BasicDBObject();
+ dbo.put("_id", id);
+ dbo.put("state", "M");
+ dbo.put("entities", new BasicDBList());
+ return new TransactionState(dbo);
+ }
+ return createTransaction(dbo);
+ }
+
+ private TransactionState createTransaction(DBObject dbo) {
+ TransactionState ret = new TransactionState(dbo);
+ states.put((String)dbo.get("_id"), ret);
+ return ret;
+ }
+
+ public void poll() {
+ init();
+ cleanup();
+ MongoCursor find = trans.find();
+ while (find.hasNext()) {
+ DBObject tr = find.next();
+ TransactionState ts = createTransaction(tr);
+ handle(ts, false);
+ }
+ }
+
+ public void cleanup() {
+ trans.remove(new BasicDBObject("cstate", "D"), concern);
+ }
+
+ public void fastHandle(DBObject trdbo) {
+ init();
+ TransactionState ts = createTransaction(trdbo);
+ handle(ts, true);
+ }
+
+ private void handle(TransactionState ts, boolean fast) {
+ if (!fast) {
+ if (ts.isLocallyExpired()) ts = loadTransaction(ts.getId());
+ // This transaction is missing, maybe I have stale data
+ if (ts.isMissing()) return;
+ // This transaction is committing right now, and is not yet timed out, skip it
+ if (ts.isCommitting() && !ts.isTimedOut()) {
+ logger.committing++;
+ return;
+ }
+ // This transaction is running right now, and is not yet timed out, skip it
+ if (ts.isRunning() && !ts.isTimedOut()) {
+ logger.running++;
+ return;
+ }
+
+ // Otherwise we can handle it
+ if (!ts.workOnMe()) return;
+ }
+ boolean finished = false;
+ try {
+ DBObject entitiesInColl = ts.getEntityIds();
+ boolean alldone = true;
+ for (String collname: entitiesInColl.keySet()) {
+ Object entitiesObj = entitiesInColl.get(collname);
+ if (!(entitiesObj instanceof List)) {
+ logger.error(ts.getId(),"has something else than a List in its entities");
+ continue;
+ }
+ List entities = (List) entitiesObj;
+ String colldb = dbname;
+ MongoCollection entcoll = db.getCollection(colldb, collname);
+ /* TODO restore this check
+ if (entdoc == null) {
+ logger.warning(ts.getId(),"refers to entity " + colldb + "/" + collname + "/" + entid + " which is not there anymore??");
+ continue;
+ }
+ */
+ MongoCursor entititesCur = entcoll.find(new BasicDBObject("_id", new BasicDBObject("$in", entities)));
+ while (entititesCur.hasNext()) {
+ DBObject entdoc = entititesCur.next();
+ String entid = (String) entdoc.get("_id");
+ boolean merged = mergeTransactions(entdoc, colldb, collname, entid);
+ if (!merged) {
+ logger.warning(ts.getId(),"could not merge entity " + colldb + "/" + collname + "/" + entid);
+ } else {
+ ts.done(collname, entid);
+ }
+ alldone &= merged;
+ }
+ }
+ if (alldone) {
+ ts.remove();
+ logger.done++;
+ if (ts.isFailed()) logger.faileds++;
+ } else {
+ ts.doneWorking();
+ logger.waiting++;
+ }
+ finished = true;
+ } finally {
+ if (!finished) {
+ ts.doneWorking();
+ }
+ }
+
+ }
+
+ public boolean mergeTransactions(DBObject entdoc, String colldb, String collname, String entid) {
+ String logid = colldb+":"+collname+":"+entid;
+
+ Object entTrObj = entdoc.get("__transactions");
+ if (entTrObj == null) {
+ logger.warning(logid, "entity has no transactions, probably it has already been committed");
+ return true;
+ }
+ if (!(entTrObj instanceof List)) {
+ logger.error(logid,"transactions are not a list !?? Nothing we can do anyway");
+ return true;
+ }
+ List entTr = (List) entTrObj;
+ List<TransactionPart> parts = new ArrayList<TransactionPart>();
+ for (Object entTrPartObj : entTr) {
+ if (!(entTrPartObj instanceof BSONObject)) {
+ logger.error(logid,"there is a part which is not a BSONObject!? Nothing we can do anyway");
+ continue;
+ }
+ TransactionPart entTrPart = new TransactionPart((BSONObject)entTrPartObj);
+ parts.add(entTrPart);
+ }
+ Collections.sort(parts);
+
+
+ boolean deleted = false;
+ boolean runningfound = false;
+ List<String> dones = new ArrayList<String>();
+ for (TransactionPart part: parts) {
+ if (part.isDeleted()) {
+ deleted = true;
+ break;
+ }
+ TransactionState ts = loadTransaction(part.getTransactionId());
+ if (ts.isDone() || ts.isMissing()) {
+ if (!runningfound) {
+ if (ts.isMissing()) {
+ logger.warning(logid,"refers to the missing transaction " + ts.getId() + ", considering it as done");
+ }
+ part.reapply(entdoc);
+ dones.add(ts.getId());
+ }
+ } else if (ts.isFailed()) {
+ // We consider it done so that it is removed
+ dones.add(ts.getId());
+ } else if (ts.isRunning()) {
+ runningfound = true;
+ }
+ }
+
+ BasicDBObject entsearch = new BasicDBObject();
+ entsearch.put("_id", entdoc.get("_id"));
+ int version = (Integer) MongoUtils.convertFromMongo(entdoc.get("version"), Integer.TYPE);
+ entsearch.put("version", version);
+ entdoc.put("version", version + 1);
+
+ WriteResult result = null;
+ MongoCollection entcoll = db.getCollection(colldb, collname);
+ if (deleted) {
+ result = entcoll.remove(entsearch, concern);
+ if (result.getError() != null) {
+ logger.error(logid, "Error " + (deleted ? "deleting":"updating") + " : " + result.getError());
+ return false;
+ } else if (result.getN() == 0) {
+ logger.entitiesSkip++;
+ // If we were deleting, we can safely ignore this, maybe someone else deleted it
+ return deleted;
+ } else {
+ logger.entitiesDone++;
+ }
+ } else {
+ // Remove done transaction parts
+ for (Iterator iterator = entTr.iterator(); iterator.hasNext();) {
+ Object entTrPartObj = (Object) iterator.next();
+ if (!(entTrPartObj instanceof BSONObject)) continue;
+ BSONObject entTrPartDoc = (BSONObject)entTrPartObj;
+ String entTrPartTrId = (String) entTrPartDoc.get("__transactionId");
+ if (dones.contains(entTrPartTrId))
+ iterator.remove();
+ }
+
+ // Using normal write concern here .. if it fails for some reason, it will eventually be made consistent again upon read
+ // FIXME does this make sense? It seems to provide no serious improvement
+ result = entcoll.update(entsearch, entdoc, false, false, WriteConcern.NORMAL);
+ logger.entitiesDone++;
+ }
+
+ return true;
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length <= 1) {
+ System.out.println("USAGE : java ..... serverAddress [serverAddress ...] dbname");
+ System.exit(1);
+ }
+ List<ServerAddress> addresses = new ArrayList<ServerAddress>();
+ for (int i = 0; i < args.length - 1; i++) {
+ ServerAddress sa = new ServerAddress(args[i]);
+ addresses.add(sa);
+ }
+ Mongo mongo = new Mongo(addresses);
+ MongoDB mdb = new MongoDB(mongo);
+ TransactionSecondPhaseThread me = new TransactionSecondPhaseThread(mdb, args[args.length - 1]);
+ me.run();
+ }
+
+}
Added: labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningCollectionWrapper.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningCollectionWrapper.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningCollectionWrapper.java (added)
+++ labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningCollectionWrapper.java Wed May 18 12:46:03 2011
@@ -0,0 +1,74 @@
+package org.apache.magma.database.mongo.collections;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+public class ListeningCollectionWrapper<T> implements Collection<T> {
+
+ protected ModifiedListener ml = null;
+ protected Collection<T> delegate = null;
+
+ public ListeningCollectionWrapper(ModifiedListener ml, Collection<T> delegate) {
+ this.ml = ml;
+ this.delegate = delegate;
+ }
+
+ public int size() {
+ return delegate.size();
+ }
+ public boolean isEmpty() {
+ return delegate.isEmpty();
+ }
+ public boolean contains(Object o) {
+ return delegate.contains(o);
+ }
+ public Iterator<T> iterator() {
+ return new ListeningIteratorWrapper<T>(ml, delegate.iterator());
+ }
+ public Object[] toArray() {
+ return delegate.toArray();
+ }
+ public <T> T[] toArray(T[] a) {
+ return delegate.toArray(a);
+ }
+ public boolean add(T e) {
+ boolean ret = delegate.add(e);
+ if (ret) ml.modified();
+ return ret;
+ }
+ public boolean remove(Object o) {
+ boolean ret = delegate.remove(o);
+ if (ret) ml.modified();
+ return ret;
+ }
+ public boolean containsAll(Collection<?> c) {
+ return delegate.containsAll(c);
+ }
+ public boolean addAll(Collection<? extends T> c) {
+ boolean ret = delegate.addAll(c);
+ if (ret) ml.modified();
+ return ret;
+ }
+ public boolean removeAll(Collection<?> c) {
+ boolean ret = delegate.removeAll(c);
+ if (ret) ml.modified();
+ return ret;
+ }
+ public boolean retainAll(Collection<?> c) {
+ boolean ret = delegate.retainAll(c);
+ if (ret) ml.modified();
+ return ret;
+ }
+ public void clear() {
+ delegate.clear();
+ ml.modified();
+ }
+ public boolean equals(Object o) {
+ return delegate.equals(o);
+ }
+ public int hashCode() {
+ return delegate.hashCode();
+ }
+
+
+}
Added: labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningIteratorWrapper.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningIteratorWrapper.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningIteratorWrapper.java (added)
+++ labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningIteratorWrapper.java Wed May 18 12:46:03 2011
@@ -0,0 +1,26 @@
+package org.apache.magma.database.mongo.collections;
+
+import java.util.Iterator;
+
+public class ListeningIteratorWrapper<T> implements Iterator<T> {
+
+ protected ModifiedListener ml;
+ protected Iterator<T> delegate;
+
+ public ListeningIteratorWrapper(ModifiedListener ml, Iterator<T> delegate) {
+ this.ml = ml;
+ this.delegate = delegate;
+ }
+
+ public boolean hasNext() {
+ return delegate.hasNext();
+ }
+ public T next() {
+ return delegate.next();
+ }
+ public void remove() {
+ delegate.remove();
+ ml.modified();
+ }
+
+}
Added: labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningListIteratorWrapper.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningListIteratorWrapper.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningListIteratorWrapper.java (added)
+++ labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningListIteratorWrapper.java Wed May 18 12:46:03 2011
@@ -0,0 +1,41 @@
+package org.apache.magma.database.mongo.collections;
+
+import java.util.Iterator;
+import java.util.ListIterator;
+
+public class ListeningListIteratorWrapper<T> extends ListeningIteratorWrapper<T> implements ListIterator<T> {
+
+ private ListIterator<T> ldelegate;
+
+ public ListeningListIteratorWrapper(ModifiedListener ml, ListIterator<T> delegate) {
+ super(ml, delegate);
+ this.ldelegate = delegate;
+ }
+
+ public boolean hasPrevious() {
+ return ldelegate.hasPrevious();
+ }
+
+ public T previous() {
+ return ldelegate.previous();
+ }
+
+ public int nextIndex() {
+ return ldelegate.nextIndex();
+ }
+
+ public int previousIndex() {
+ return ldelegate.previousIndex();
+ }
+
+ public void set(T e) {
+ ldelegate.set(e);
+ ml.modified();
+ }
+
+ public void add(T e) {
+ ldelegate.add(e);
+ ml.modified();
+ }
+
+}
Added: labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningListWrapper.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningListWrapper.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningListWrapper.java (added)
+++ labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningListWrapper.java Wed May 18 12:46:03 2011
@@ -0,0 +1,63 @@
+package org.apache.magma.database.mongo.collections;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.ListIterator;
+
+public class ListeningListWrapper<T> extends ListeningCollectionWrapper<T> implements List<T> {
+
+ private List<T> ldelegate;
+
+ public ListeningListWrapper(ModifiedListener ml, List<T> delegate) {
+ super(ml, delegate);
+ this.ldelegate = delegate;
+ }
+
+ public boolean addAll(int index, Collection<? extends T> c) {
+ boolean ret = ldelegate.addAll(index, c);
+ if (ret) ml.modified();
+ return ret;
+ }
+
+ public T get(int index) {
+ return ldelegate.get(index);
+ }
+
+ public T set(int index, T element) {
+ T ret = ldelegate.set(index, element);
+ ml.modified();
+ return ret;
+ }
+
+ public void add(int index, T element) {
+ ldelegate.add(index, element);
+ ml.modified();
+ }
+
+ public T remove(int index) {
+ T ret = ldelegate.remove(index);
+ ml.modified();
+ return ret;
+ }
+
+ public int indexOf(Object o) {
+ return ldelegate.indexOf(o);
+ }
+
+ public int lastIndexOf(Object o) {
+ return ldelegate.lastIndexOf(o);
+ }
+
+ public ListIterator<T> listIterator() {
+ return new ListeningListIteratorWrapper<T>(ml, ldelegate.listIterator());
+ }
+
+ public ListIterator<T> listIterator(int index) {
+ return new ListeningListIteratorWrapper<T>(ml, ldelegate.listIterator(index));
+ }
+
+ public List<T> subList(int fromIndex, int toIndex) {
+ return new ListeningListWrapper<T>(ml, ldelegate.subList(fromIndex, toIndex));
+ }
+
+}
Added: labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningMapWrapper.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningMapWrapper.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningMapWrapper.java (added)
+++ labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningMapWrapper.java Wed May 18 12:46:03 2011
@@ -0,0 +1,79 @@
+package org.apache.magma.database.mongo.collections;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+public class ListeningMapWrapper<K,V> implements Map<K,V> {
+
+ private ModifiedListener ml;
+ private Map<K,V> delegate;
+
+ public ListeningMapWrapper(ModifiedListener ml, Map<K, V> delegate) {
+ this.ml = ml;
+ this.delegate = delegate;
+ }
+
+ public int size() {
+ return delegate.size();
+ }
+
+ public boolean isEmpty() {
+ return delegate.isEmpty();
+ }
+
+ public boolean containsKey(Object key) {
+ return delegate.containsKey(key);
+ }
+
+ public boolean containsValue(Object value) {
+ return delegate.containsValue(value);
+ }
+
+ public V get(Object key) {
+ return delegate.get(key);
+ }
+
+ public V put(K key, V value) {
+ V ret = delegate.put(key, value);
+ ml.modified();
+ return ret;
+ }
+
+ public V remove(Object key) {
+ V ret = delegate.remove(key);
+ ml.modified();
+ return ret;
+ }
+
+ public void putAll(Map<? extends K, ? extends V> m) {
+ delegate.putAll(m);
+ ml.modified();
+ }
+
+ public void clear() {
+ delegate.clear();
+ ml.modified();
+ }
+
+ public Set<K> keySet() {
+ return new ListeningSetWrapper<K>(ml, delegate.keySet());
+ }
+
+ public Collection<V> values() {
+ return new ListeningCollectionWrapper<V>(ml, delegate.values());
+ }
+
+ public Set<java.util.Map.Entry<K, V>> entrySet() {
+ return new ListeningSetWrapper<java.util.Map.Entry<K, V>>(ml, delegate.entrySet());
+ }
+
+ public boolean equals(Object o) {
+ return delegate.equals(o);
+ }
+
+ public int hashCode() {
+ return delegate.hashCode();
+ }
+
+}
Added: labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningSetWrapper.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningSetWrapper.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningSetWrapper.java (added)
+++ labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ListeningSetWrapper.java Wed May 18 12:46:03 2011
@@ -0,0 +1,14 @@
+package org.apache.magma.database.mongo.collections;
+
+import java.util.Set;
+
+public class ListeningSetWrapper<T> extends ListeningCollectionWrapper<T> implements Set<T> {
+
+ protected Set<T> sdelegate;
+
+ public ListeningSetWrapper(ModifiedListener ml, Set<T> delegate) {
+ super(ml, delegate);
+ this.sdelegate = delegate;
+ }
+
+}
Added: labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ModifiedListener.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ModifiedListener.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ModifiedListener.java (added)
+++ labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/mongo/collections/ModifiedListener.java Wed May 18 12:46:03 2011
@@ -0,0 +1,7 @@
+package org.apache.magma.database.mongo.collections;
+
+public interface ModifiedListener {
+
+ public void modified();
+
+}
Added: labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/openjpa/InstallVersionByDefault.aj
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/openjpa/InstallVersionByDefault.aj?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/openjpa/InstallVersionByDefault.aj (added)
+++ labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/openjpa/InstallVersionByDefault.aj Wed May 18 12:46:03 2011
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.magma.database.openjpa;
+
+import javax.persistence.GeneratedValue;
+import javax.persistence.Version;
+import javax.persistence.Entity;
+
+import org.apache.magma.beans.MagReadOnly;
+
+/**
+ * Provides by default a version field to those entities missing it.
+ *
+ * @author Simone Gianni <si...@apache.org>
+ */
+
+public aspect InstallVersionByDefault {
+
+ /**
+ * Marker interface for beans receiving the default version field.
+ *
+ * @author Simone Gianni <si...@apache.org>
+ */
+ public static interface WithDefaultVersion {
+
+ public long getVersion();
+ public void setVersion(long newVersion);
+
+ }
+
+ declare parents : ((@Entity *) && !(@NoVersion *) && !hasmethod(@Version public * get*()) && !hasfield(@Version * *)) implements WithDefaultVersion;
+ //declare parents : ((@Entity *) && !(@NoVersion *)) implements WithDefaultVersion;
+
+ /**
+ * The default version field.
+ */
+ private long WithDefaultVersion.version;
+
+ /**
+ * Getter for the default version field.
+ * @return The version of the bean.
+ */
+ @Version
+ @MagReadOnly
+ public long WithDefaultVersion.getVersion() {
+ return version;
+ }
+
+ /**
+ * Setter for the default version field.
+ * @param newVersion The new version of the bean.
+ */
+ public void WithDefaultVersion.setVersion(long newVersion) {
+ version = newVersion;
+ }
+
+
+}
Added: labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/openjpa/NoVersion.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/openjpa/NoVersion.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/openjpa/NoVersion.java (added)
+++ labs/magma/trunk/database-mongodb/src/main/java/org/apache/magma/database/openjpa/NoVersion.java Wed May 18 12:46:03 2011
@@ -0,0 +1,21 @@
+package org.apache.magma.database.openjpa;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import javax.persistence.Entity;
+import javax.persistence.Version;
+
+/**
+ * Use this annotation on {@link Entity} beans that does not have a {@link Version} and should
+ * not receive the default one.
+ *
+ * @author Simone Gianni <si...@apache.org>
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface NoVersion {
+
+}
Added: labs/magma/trunk/database-mongodb/src/main/resources/META-INF/magma.default.properties
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/main/resources/META-INF/magma.default.properties?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/main/resources/META-INF/magma.default.properties (added)
+++ labs/magma/trunk/database-mongodb/src/main/resources/META-INF/magma.default.properties Wed May 18 12:46:03 2011
@@ -0,0 +1,15 @@
+#Licensed to the Apache Software Foundation (ASF) under one or more
+#contributor license agreements. See the NOTICE file distributed with
+#this work for additional information regarding copyright ownership.
+#The ASF licenses this file to You under the Apache License, Version 2.0
+#(the "License"); you may not use this file except in compliance with
+#the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+org.apache.magma.conversion.Converter.Amongodatabase=org.apache.magma.database.mongo.MongoDatabaseConverter
Added: labs/magma/trunk/database-mongodb/src/main/resources/META-INF/magma.devel.default.properties
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/main/resources/META-INF/magma.devel.default.properties?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/main/resources/META-INF/magma.devel.default.properties (added)
+++ labs/magma/trunk/database-mongodb/src/main/resources/META-INF/magma.devel.default.properties Wed May 18 12:46:03 2011
@@ -0,0 +1 @@
+mongodb.clientThread=true
\ No newline at end of file
Added: labs/magma/trunk/database-mongodb/src/test/ci/META-INF/magma.properties
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/test/ci/META-INF/magma.properties?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/test/ci/META-INF/magma.properties (added)
+++ labs/magma/trunk/database-mongodb/src/test/ci/META-INF/magma.properties Wed May 18 12:46:03 2011
@@ -0,0 +1,8 @@
+mongodb.host=mongodb1
+mongodb.defaultDb=ci
+mongodb.clientThread=false
+threads=5
+batch=10
+beans=5000
+iterations=10
+timeout=120000
\ No newline at end of file
Added: labs/magma/trunk/database-mongodb/src/test/ci/org/apache/magma/database/mongo/Bean.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/test/ci/org/apache/magma/database/mongo/Bean.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/test/ci/org/apache/magma/database/mongo/Bean.java (added)
+++ labs/magma/trunk/database-mongodb/src/test/ci/org/apache/magma/database/mongo/Bean.java Wed May 18 12:46:03 2011
@@ -0,0 +1,28 @@
+package org.apache.magma.database.mongo;
+
+import javax.persistence.Entity;
+
+import org.apache.magma.beans.MagmaBean;
+
+@MagmaBean
+@Entity
+public class Bean {
+
+ private String name;
+ private long counter;
+
+ public long getCounter() {
+ return counter;
+ }
+ public void setCounter(long counter) {
+ this.counter = counter;
+ }
+
+ public String getName() {
+ return name;
+ }
+ public void setName(String name) {
+ this.name = name;
+ }
+
+}
Added: labs/magma/trunk/database-mongodb/src/test/ci/org/apache/magma/database/mongo/IntegrationRun.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/test/ci/org/apache/magma/database/mongo/IntegrationRun.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/test/ci/org/apache/magma/database/mongo/IntegrationRun.java (added)
+++ labs/magma/trunk/database-mongodb/src/test/ci/org/apache/magma/database/mongo/IntegrationRun.java Wed May 18 12:46:03 2011
@@ -0,0 +1,94 @@
+package org.apache.magma.database.mongo;
+
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.*;
+import junit.framework.AssertionFailedError;
+
+import org.apache.magma.basics.MagmaException;
+import org.apache.magma.basics.startup.Cycle;
+import org.apache.magma.database.Database;
+import org.apache.magma.settings.Settings;
+import org.junit.Test;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.Mongo;
+import com.mongodb.WriteConcern;
+
+
+public class IntegrationRun {
+
+ @Test
+ public void threadLauncher() throws Exception {
+ System.out.println("Starting");
+ Cycle.start();
+ Database db = new Database();
+ MongoDB mdb = ((MongoDatabase)db).getDb();
+ mdb.getCollection("ci", "bean").remove(new BasicDBObject(), WriteConcern.SAFE);
+ mdb.getCollection("ci", "transactions").remove(new BasicDBObject(), WriteConcern.SAFE);
+ mdb.getCollection("ci", "doneTransactions").remove(new BasicDBObject(), WriteConcern.SAFE);
+ System.out.println("Sleeping to have mongodb delete objects");
+ Thread.sleep(5000);
+ Cycle.stop();
+
+ System.out.println("Starting cleanup thread");
+ TransactionSecondPhaseThread spt = new TransactionSecondPhaseThread(mdb, Settings.get("mongodb.defaultDb"));
+ spt.setSleepPeriod(10);
+ spt.setBackupTransactions(true);
+ Thread sp = new Thread(spt);
+ sp.setDaemon(true);
+ sp.start();
+
+ int threads = Integer.parseInt(Settings.get("threads"));
+ int beans = Integer.parseInt(Settings.get("beans"));
+
+ WorkGroup wg = new WorkGroup(beans);
+
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < threads; i++) {
+ WorkThread t = new WorkThread(wg, "Thread" + i);
+ t.start();
+
+ /*
+ TransactionSecondPhaseThread spt2 = new TransactionSecondPhaseThread(mdb, Settings.get("mongodb.defaultDb"));
+ spt2.setSleepPeriod(10);
+ spt2.setBackupTransactions(true);
+ Thread sp2 = new Thread(spt2);
+ sp2.setDaemon(true);
+ sp2.start();
+ */
+ }
+
+ Thread.sleep(1000);
+ while (!wg.isFinished()) {
+ Thread.sleep(1000);
+ System.out.println(wg.toString());
+ }
+
+ System.out.println("Finished, took " + (System.currentTimeMillis() - start));
+
+ {
+ int cnt = 0;
+ while (spt.getLogger().done != 0 || spt.getLogger().waiting != 0) {
+ Thread.sleep(1000);
+ cnt++;
+ if (cnt > 600) fail("Too much time to recollect transactions!");
+ }
+ }
+
+ System.out.println("Sanity check");
+
+ int iterations = Integer.parseInt(Settings.get("iterations"));
+
+ MongoCollection beancoll = mdb.getCollection("ci", "bean");
+ Cycle.start();
+ db = new Database();
+ assertThat(db.count(Bean.class, "{\"counter\":\"$1\"}", iterations), equalTo(beans));
+ assertThat(db.count(Bean.class, "{\"counter\": {\"$lt\":\"$1\"}}", iterations), equalTo(0));
+ Cycle.stop();
+
+ assertThat(mdb.getCollection("ci", "transactions").count(new BasicDBObject()), equalTo(0));
+
+ }
+
+}
Added: labs/magma/trunk/database-mongodb/src/test/ci/org/apache/magma/database/mongo/WorkGroup.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/test/ci/org/apache/magma/database/mongo/WorkGroup.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/test/ci/org/apache/magma/database/mongo/WorkGroup.java (added)
+++ labs/magma/trunk/database-mongodb/src/test/ci/org/apache/magma/database/mongo/WorkGroup.java Wed May 18 12:46:03 2011
@@ -0,0 +1,121 @@
+package org.apache.magma.database.mongo;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class WorkGroup {
+
+ private static Random rnd = new Random();
+
+ private List<String> beansid = new ArrayList<String>();
+ private ReadWriteLock beansLock = new ReentrantReadWriteLock();
+ private volatile int createdBeans;
+ private int maxbeans;
+
+ public WorkGroup(int maxbeans) {
+ this.maxbeans = maxbeans;
+ }
+
+ public boolean needMoreBeans() {
+ return createdBeans < maxbeans;
+ }
+
+ public synchronized int creating(int quantity) {
+ if (createdBeans + quantity > maxbeans) {
+ quantity = maxbeans - createdBeans;
+ }
+ createdBeans += quantity;
+ return quantity;
+ }
+
+ public void createdIds(Set<String> ids) {
+ Lock wl = beansLock.writeLock();
+ wl.lock();
+ try {
+ beansid.addAll(ids);
+ } finally {
+ wl.unlock();
+ }
+ }
+
+ public synchronized void notCreated(int createds) {
+ createdBeans -= createds;
+ }
+
+ public Set<String> getRandomIds(int quantity) {
+ if (quantity > createdBeans) quantity = createdBeans / 2;
+ Set<String> ret = new HashSet<String>(quantity);
+ if (quantity == 0) return ret;
+ Lock rl = beansLock.readLock();
+ rl.lock();
+ try {
+ while (ret.size() < quantity) {
+ ret.add(beansid.get(rnd.nextInt(beansid.size())));
+ }
+ } finally {
+ rl.unlock();
+ }
+ return ret;
+ }
+
+
+ private volatile int running;
+ private volatile int timedouts;
+ private volatile int completeds;
+ private volatile int optlocks;
+ private volatile int errors;
+
+ public void running(String id) {
+ running++;
+ }
+
+ public void timedOut(String id) {
+ timedouts++;
+ running--;
+ }
+
+ public void completed(String id) {
+ completeds++;
+ running--;
+ }
+ public void inError(String myid) {
+ errors++;
+ running--;
+ }
+
+ public boolean isFinished() {
+ return running == 0;
+ }
+
+ public void optlock() {
+ optlocks++;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder ret = new StringBuilder();
+
+ ret.append("Work group situation:\n");
+ ret.append(" running:" + running);
+ ret.append("\n");
+ ret.append(" beans:" + createdBeans);
+ ret.append("\n");
+ ret.append(" timedout:" + timedouts);
+ ret.append("\n");
+ ret.append(" errors:" + errors);
+ ret.append("\n");
+ ret.append(" completed:" + completeds);
+ ret.append("\n");
+ ret.append(" optlocks:" + optlocks);
+ ret.append("\n");
+
+ return ret.toString();
+ }
+
+}
Added: labs/magma/trunk/database-mongodb/src/test/ci/org/apache/magma/database/mongo/WorkThread.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/test/ci/org/apache/magma/database/mongo/WorkThread.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/test/ci/org/apache/magma/database/mongo/WorkThread.java (added)
+++ labs/magma/trunk/database-mongodb/src/test/ci/org/apache/magma/database/mongo/WorkThread.java Wed May 18 12:46:03 2011
@@ -0,0 +1,146 @@
+package org.apache.magma.database.mongo;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import javax.persistence.OptimisticLockException;
+
+import org.apache.magma.basics.startup.Cycle;
+import org.apache.magma.database.Database;
+import org.apache.magma.settings.Settings;
+
+public class WorkThread extends Thread {
+
+ private WorkGroup group;
+ private String myid;
+ private Random rnd = new Random();
+
+ public WorkThread(WorkGroup group, String id) {
+ this.group = group;
+ this.myid = id;
+ }
+
+ int batch = Integer.parseInt(Settings.get("batch"));
+ int iterations = Integer.parseInt(Settings.get("iterations"));
+ long timeout = Long.parseLong(Settings.get("timeout"));
+ boolean usequery = Settings.get("usequery") != null && Settings.get("usequery").equalsIgnoreCase("true");
+
+ long start = System.currentTimeMillis();
+
+ long mycnt = 0;
+
+ private Collection<Bean> findBeans() {
+ if (usequery) {
+ List<Bean> query = new Database().query(Bean.class, "{\"counter\": {\"$lt\": \"$1\"}}", iterations);
+ Set<Bean> ret = new HashSet<Bean>(batch);
+ int cnt = 0;
+ for (Bean bean : query) {
+ cnt++;
+ if (rnd.nextDouble() > 0.8) {
+ ret.add(bean);
+ }
+ if (ret.size() > batch || cnt > batch * 5) break;
+ }
+ return ret;
+ } else {
+ Set<String> ids = group.getRandomIds(batch);
+ Database db = new Database();
+ Set<Bean> ret = new HashSet<Bean>(ids.size());
+ for (String id : ids) {
+ Bean b = db.load(Bean.class, id);
+ if (b != null) ret.add(b);
+ }
+ return ret;
+ }
+ }
+
+ @Override
+ public void run() {
+ group.running(myid);
+ int terminated = 0;
+ while (true) {
+ if (System.currentTimeMillis() > start + timeout) {
+ group.timedOut(myid);
+ return;
+ }
+ int dones = 0;
+ int createrequest = 0;
+ Set<String> createds = new HashSet<String>();
+ Cycle.start();
+ try {
+ Database db = new Database();
+
+ if (group.needMoreBeans()) {
+ int tocreate = group.creating(batch);
+ createrequest = tocreate;
+ while (tocreate > 0 && createds.size() < batch) {
+ Bean b = new Bean();
+ b.setName(myid + " new bean " + (mycnt++));
+ db.save(b);
+ createds.add(MongoUtils.getMongoId(b));
+ tocreate--;
+ dones++;
+ }
+ group.notCreated(tocreate);
+ group.createdIds(createds);
+ } else {
+ for (int cnt = 0; cnt < 10; cnt++) {
+ Collection<Bean> missing = findBeans();
+ for (Bean bean : missing) {
+ if (bean.getCounter() < iterations) {
+ bean.setCounter(bean.getCounter() + 1);
+ db.save(bean);
+ dones++;
+ }
+ if (dones >= batch) break;
+ }
+ if (dones >= batch) break;
+ }
+ }
+ } catch (Exception e) {
+ Cycle.stopForError();
+ e.printStackTrace();
+ if (createds.size() > 0) {
+ group.notCreated(createrequest);
+ }
+ } finally {
+ try {
+ Cycle.get().teardownMongoDatabase();
+ } catch (Throwable e) {
+ if (e instanceof OptimisticLockException) {
+ group.optlock();
+ } else {
+ Throwable cause = e.getCause();
+ if (cause instanceof OptimisticLockException) {
+ group.optlock();
+ } else {
+ group.inError(myid);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ Cycle.stop();
+ }
+ if (dones == 0) {
+ terminated++;
+ if (terminated > 10) {
+ group.completed(myid);
+ break;
+ }
+ try {
+ sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ } else {
+ terminated = 0;
+ }
+ }
+ }
+
+
+}
Added: labs/magma/trunk/database-mongodb/src/test/java/META-INF/magma.test.properties
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/test/java/META-INF/magma.test.properties?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/test/java/META-INF/magma.test.properties (added)
+++ labs/magma/trunk/database-mongodb/src/test/java/META-INF/magma.test.properties Wed May 18 12:46:03 2011
@@ -0,0 +1 @@
+mongodb.clientThread=false
Added: labs/magma/trunk/database-mongodb/src/test/java/com/mongodb/DummyResultFactory.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/test/java/com/mongodb/DummyResultFactory.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/test/java/com/mongodb/DummyResultFactory.java (added)
+++ labs/magma/trunk/database-mongodb/src/test/java/com/mongodb/DummyResultFactory.java Wed May 18 12:46:03 2011
@@ -0,0 +1,19 @@
+package com.mongodb;
+
+public class DummyResultFactory {
+
+ public static WriteResult ok(int n) {
+ CommandResult cr = new CommandResult();
+ cr.put("ok", 1);
+ cr.put("n", n);
+ return new WriteResult(cr, WriteConcern.SAFE);
+ }
+
+ public static WriteResult error(String msg) {
+ CommandResult cr = new CommandResult();
+ cr.put("ok", 0);
+ cr.put("errmsg", msg);
+ return new WriteResult(cr, WriteConcern.SAFE);
+ }
+
+}
Added: labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/BSONPieces.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/BSONPieces.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/BSONPieces.java (added)
+++ labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/BSONPieces.java Wed May 18 12:46:03 2011
@@ -0,0 +1,71 @@
+package org.apache.magma.database.mongo;
+
+import org.apache.magma.database.mongo.test.domain.City;
+import org.apache.magma.database.mongo.test.domain.Person;
+import org.apache.magma.database.mongo.test.utils.MongoTestCursor;
+
+import static org.junit.Assert.*;
+import static org.hamcrest.CoreMatchers.*;
+
+import com.mongodb.DBObject;
+import com.mongodb.util.JSON;
+
+public class BSONPieces {
+
+ public static String simplePersonId = "2296e47076120002";
+ public static String complexPersonId = "2296e47076120012";
+ public static String cityId = "2296e47076130001";
+
+ public static DBObject simplePerson = (DBObject) JSON.parse(
+ "{" +
+ " \"_id\" : \"" + simplePersonId + "\"," +
+ " \"_jcl\" : \"Person\"," +
+ " \"version\": 1," +
+ " \"name\" : \"Simone\"," +
+ " \"birthday\": \"1979-03-05\","+
+ " \"intelligence\" : 100," +
+ "}"
+ );
+ public static void assertSimplePersonLoaded(Person p) {
+ assertThat(p.getId(), equalTo(Long.parseLong(simplePersonId, 16)));
+ assertThat(p.getName(), equalTo("Simone"));
+ assertThat(p.getQi(), equalTo(100l));
+ }
+
+ public static DBObject complexPerson = (DBObject) JSON.parse(
+ "{" +
+ " \"_id\" : \"" + complexPersonId + "\"," +
+ " \"_jcl\" : \"Person\"," +
+ " \"version\": 1," +
+ " \"name\" : \"Simone\"," +
+ " \"intelligence\" : 100," +
+ " \"addresses\" : [ \"2296e47076120004\", \"2296e47076120005\"], " +
+ " \"friends\" : [" +
+ " \"" + simplePersonId + "\"" +
+ " ]" +
+ "}"
+ );
+ public static void assertComplesPersonLoadedFields(Person p) {
+ assertThat(p.getId(), equalTo(Long.parseLong(complexPersonId, 16)));
+ assertThat(p.getName(), equalTo("Simone"));
+ assertThat(p.getQi(), equalTo(100l));
+ }
+
+ public static DBObject city = (DBObject) JSON.parse(
+ "{" +
+ " \"_id\" : \"" + cityId + "\"," +
+ " \"_jcl\" : \"City\"," +
+ " \"version\": 1," +
+ " \"name\" : \"Roma\"," +
+ " \"notes\" : [\"Beautiful monuments, horrible taffic jams\"]" +
+ "}"
+ );
+ public static void assertCityLoaded(City c) {
+ assertThat(c, notNullValue());
+ assertThat(c.getId(), equalTo(Long.parseLong(cityId, 16)));
+ assertThat(c.getName(), equalTo("Roma"));
+ assertThat(c.getNotes().size(), equalTo(1));
+ assertThat(c.getNotes().get(0), equalTo("Beautiful monuments, horrible taffic jams"));
+ }
+
+}
Added: labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/BasicDirtyTest.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/BasicDirtyTest.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/BasicDirtyTest.java (added)
+++ labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/BasicDirtyTest.java Wed May 18 12:46:03 2011
@@ -0,0 +1,130 @@
+package org.apache.magma.database.mongo;
+
+import static org.apache.magma.database.mongo.test.utils.DBObjectMatcher.dbObject;
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.*;
+import static org.junit.matchers.JUnitMatchers.*;
+
+import java.util.Set;
+
+import javax.persistence.CascadeType;
+
+import org.apache.magma.database.mongo.test.domain.Address;
+import org.apache.magma.database.mongo.test.domain.Person;
+import org.apache.magma.database.mongo.test.domain.ToDo;
+import org.junit.Test;
+
+
+public class BasicDirtyTest extends MongoDbTestBase {
+
+ @Test
+ public void dirty() throws Exception {
+ Person p = new Person();
+ assertThat(p.isDirty(), equalTo(true));
+ assertThat(p.isFromDb(), equalTo(false));
+ assertThat(p.isOnDb(), equalTo(false));
+ p.clearDirty();
+ assertThat(p.isDirty(), equalTo(false));
+ assertThat(p.isFromDb(), equalTo(false));
+ assertThat(p.isOnDb(), equalTo(false));
+
+ p.setName("Simone");
+
+ assertThat(p.isDirty(), equalTo(true));
+ assertThat(p.isFromDb(), equalTo(false));
+ assertThat(p.isOnDb(), equalTo(false));
+
+ p = new Person();
+ assertThat(p.isDirty(), equalTo(true));
+ assertThat(p.isFromDb(), equalTo(false));
+ assertThat(p.isOnDb(), equalTo(false));
+ p.clearDirty();
+ assertThat(p.isDirty(), equalTo(false));
+ assertThat(p.isFromDb(), equalTo(false));
+ assertThat(p.isOnDb(), equalTo(false));
+
+ Person p1 = new Person();
+ assertThat(p1.isDirty(), equalTo(true));
+ assertThat(p.isFromDb(), equalTo(false));
+ assertThat(p.isOnDb(), equalTo(false));
+ p1.clearDirty();
+ assertThat(p1.isDirty(), equalTo(false));
+ assertThat(p.isFromDb(), equalTo(false));
+ assertThat(p.isOnDb(), equalTo(false));
+
+ Set<Person> fr = p.getFriends();
+ assertThat(p.isDirty(), equalTo(false));
+ assertThat(p.isFromDb(), equalTo(false));
+ assertThat(p.isOnDb(), equalTo(false));
+
+ fr.add(p1);
+ assertThat(p.isDirty(), equalTo(true));
+ assertThat(p.isFromDb(), equalTo(false));
+ assertThat(p.isOnDb(), equalTo(false));
+ }
+
+ @Test
+ public void fromDb() throws Exception {
+ mdb.getCollection("test", "person")
+ .expectFindOne(dbObject("{'_id':'" + BSONPieces.simplePersonId + "'}"), cloneDbo(BSONPieces.simplePerson));
+
+ Person p = db.load(Person.class, BSONPieces.simplePersonId);
+
+ mdb.checkAll();
+ BSONPieces.assertSimplePersonLoaded(p);
+
+ assertThat(p.isFromDb(), equalTo(true));
+ assertThat(p.isOnDb(), equalTo(true));
+ assertThat(p.isDirty(), equalTo(false));
+
+ p.setName("Test");
+
+ assertThat(p.isFromDb(), equalTo(true));
+ assertThat(p.isOnDb(), equalTo(true));
+ assertThat(p.isDirty(), equalTo(true));
+
+ }
+
+ @Test
+ public void loadedsAndMainEntities() throws Exception {
+ Person p = new Person();
+ Address a = new Address();
+ p.getAddresses().add(a);
+
+ assertThat(p.isLoaded("addresses"), equalTo(true));
+
+ Set<MongoEntity> persist = p.findMainEntities(CascadeType.PERSIST);
+
+ assertThat(persist.size(), equalTo(2));
+ assertThat(persist.contains(p), equalTo(true));
+ assertThat(persist.contains(a), equalTo(true));
+
+ assertThat(p.isFromDb(), equalTo(false));
+ assertThat(p.isOnDb(), equalTo(false));
+ assertThat(a.isFromDb(), equalTo(false));
+ assertThat(a.isOnDb(), equalTo(false));
+
+ ToDo td = new ToDo();
+ p.getTodos().add(td);
+
+ td.setOwner(p);
+ td.setText("Make MongoDB work");
+
+ assertThat(p.isLoaded("todos"), equalTo(true));
+
+ persist = p.findMainEntities(CascadeType.PERSIST);
+
+ assertThat(persist.size(), equalTo(3));
+ assertThat(persist.contains(p), equalTo(true));
+ assertThat(persist.contains(a), equalTo(true));
+ assertThat(persist.contains(td), equalTo(true));
+
+ assertThat(p.isFromDb(), equalTo(false));
+ assertThat(p.isOnDb(), equalTo(false));
+ assertThat(a.isFromDb(), equalTo(false));
+ assertThat(a.isOnDb(), equalTo(false));
+ assertThat(td.isFromDb(), equalTo(false));
+ assertThat(td.isOnDb(), equalTo(false));
+ }
+
+}
Added: labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/JpaParsingTest.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/JpaParsingTest.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/JpaParsingTest.java (added)
+++ labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/JpaParsingTest.java Wed May 18 12:46:03 2011
@@ -0,0 +1,68 @@
+package org.apache.magma.database.mongo;
+
+import static org.junit.Assert.*;
+import static org.hamcrest.CoreMatchers.*;
+
+import java.util.List;
+
+import javax.persistence.CascadeType;
+
+import org.apache.magma.beans.BeanData;
+import org.apache.magma.beans.PropertyInfo;
+import org.apache.magma.database.mongo.test.domain.ImportantPerson;
+import org.apache.magma.database.mongo.test.domain.Person;
+import org.junit.Test;
+
+public class JpaParsingTest {
+
+ @Test
+ public void parsePerson() throws Exception {
+ BeanData bd = BeanData.getFor(Person.class);
+
+ assertThat(bd.isJpaClass(), equalTo(true));
+ assertThat(bd.isJpaSubEntity(), equalTo(false));
+ assertThat(bd.getJpaTableName(), equalTo("person"));
+ assertThat(bd.getJpaSuper(), nullValue());
+
+ BeanData ipbd = BeanData.getFor(ImportantPerson.class);
+ assertThat(ipbd.isJpaClass(), equalTo(true));
+ assertThat(ipbd.isJpaSubEntity(), equalTo(false));
+ assertThat(ipbd.getJpaTableName(), equalTo("person"));
+ assertThat((Class)ipbd.getJpaSuper(), equalTo(Person.class));
+
+
+ PropertyInfo namep = bd.getProperty("name");
+ assertThat(namep, notNullValue());
+ assertThat(namep.isJpaTransient(), equalTo(false));
+ assertThat(namep.getJpaColName(), equalTo("name"));
+ assertThat(namep.isSubEntity(), equalTo(false));
+
+ PropertyInfo cachedDatap = bd.getProperty("cachedData");
+ assertThat(cachedDatap, notNullValue());
+ assertThat(cachedDatap.isJpaTransient(), equalTo(true));
+
+
+ PropertyInfo addressesp = bd.getProperty("addresses");
+ assertThat(addressesp, notNullValue());
+ assertThat(addressesp.isSubEntity(), equalTo(false));
+ assertThat(addressesp.isMongoPersisted(), equalTo(true));
+
+ PropertyInfo friendsp = bd.getProperty("friends");
+ assertThat(friendsp, notNullValue());
+ assertThat(friendsp.getJpaCascadeType(), notNullValue());
+ assertThat(friendsp.getJpaCascadeType()[0], equalTo(CascadeType.PERSIST));
+ assertThat(friendsp.isSubEntity(), equalTo(false));
+ assertThat(friendsp.isMongoPersisted(), equalTo(true));
+
+ List<PropertyInfo> basic = bd.getJpaBasicFields();
+ assertTrue(basic.contains(namep));
+ assertFalse(basic.contains(cachedDatap));
+ assertFalse(basic.contains(friendsp));
+
+ List<PropertyInfo> rels = bd.getJpaRelationFields();
+ assertFalse(rels.contains(namep));
+ assertFalse(rels.contains(cachedDatap));
+ assertTrue(rels.contains(friendsp));
+ }
+
+}
Added: labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/MetadataBasicsTest.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/MetadataBasicsTest.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/MetadataBasicsTest.java (added)
+++ labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/MetadataBasicsTest.java Wed May 18 12:46:03 2011
@@ -0,0 +1,31 @@
+package org.apache.magma.database.mongo;
+
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.*;
+
+import java.util.List;
+
+import org.apache.magma.database.mongo.test.domain.ImportantPerson;
+import org.apache.magma.database.mongo.test.domain.Person;
+import org.junit.Test;
+import static org.junit.matchers.JUnitMatchers.*;
+
+
+public class MetadataBasicsTest {
+
+ @Test
+ public void metadataSubclasses() throws Exception {
+ MongoMetadata mm = new MongoMetadata("test");
+
+ mm.addClass(Person.class);
+ List<Class> subs = mm.findSubs(Person.class);
+ assertThat(subs, hasItem(equalTo((Class)Person.class)));
+
+ mm.addClass(ImportantPerson.class);
+
+ subs = mm.findSubs(Person.class);
+ assertThat(subs, hasItem(equalTo((Class)Person.class)));
+ assertThat(subs, hasItem(equalTo((Class)ImportantPerson.class)));
+ }
+
+}
Added: labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/MongoDbTestBase.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/MongoDbTestBase.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/MongoDbTestBase.java (added)
+++ labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/MongoDbTestBase.java Wed May 18 12:46:03 2011
@@ -0,0 +1,52 @@
+package org.apache.magma.database.mongo;
+
+import org.apache.magma.basics.startup.Cycle;
+import org.apache.magma.database.mongo.test.utils.MongoTestCollection.Expect;
+import org.apache.magma.database.mongo.test.utils.MongoTestDB;
+import org.bson.BSONObject;
+import org.hamcrest.Matcher;
+import org.junit.After;
+import org.junit.Before;
+
+import static org.apache.magma.database.mongo.test.utils.DBObjectMatcher.*;
+import static org.hamcrest.CoreMatchers.*;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBObject;
+import com.mongodb.DummyResultFactory;
+
+public class MongoDbTestBase {
+
+ protected MongoTestDB mdb = null;
+ protected Transaction tr;
+ protected MongoDatabase db;
+
+ @Before
+ public void initMongoDb() {
+ mdb = new MongoTestDB();
+ mdb.getCollection("test", "metadata")
+ .addExpectation(new Expect("findOne", new Matcher[] { dbObject("{'_id':'javaMetadatatest'}") }, null, true, false))
+ .addExpectation(new Expect("update", new Matcher[] { dbObject("{'_id':'javaMetadatatest'}"), anything(), anything(), anything(), anything() }, DummyResultFactory.ok(1), true, true));
+ mdb.getCollection("test", "transactions")
+ .addExpectation(new Expect("insert", new Matcher[] { dbObject("{'state':'R'}"), anything() }, DummyResultFactory.ok(1), true, true))
+ .addExpectation(new Expect("update", new Matcher[] { anything(), dbObject("{'$set':{'state':'D'}}"), anything(), anything(), anything() }, DummyResultFactory.ok(1), true, true));
+ db = new MongoDatabase(mdb, "test");
+ Cycle.get().testMongoDatabase(db);
+ tr = db.getTransaction();
+ tr.setFastSecondPhase(false);
+ }
+
+ @After
+ public void closeMongoDb() {
+ Cycle.get().teardownMongoDatabase();
+ mdb.checkAll();
+ mdb.clearAllExpectations();
+ }
+
+ protected DBObject cloneDbo(BSONObject dbo) {
+ BasicDBObject ret = new BasicDBObject();
+ ret.putAll(dbo);
+ return ret;
+ }
+
+}
Added: labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/ReadingTest.java
URL: http://svn.apache.org/viewvc/labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/ReadingTest.java?rev=1124229&view=auto
==============================================================================
--- labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/ReadingTest.java (added)
+++ labs/magma/trunk/database-mongodb/src/test/java/org/apache/magma/database/mongo/ReadingTest.java Wed May 18 12:46:03 2011
@@ -0,0 +1,176 @@
+package org.apache.magma.database.mongo;
+
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.*;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.magma.database.mongo.test.domain.ImportantPerson;
+import org.apache.magma.database.mongo.test.domain.Person;
+import org.apache.magma.database.mongo.test.domain.Person.PersonType;
+
+import static org.apache.magma.database.mongo.test.utils.DBObjectMatcher.*;
+import org.junit.Test;
+
+import com.mongodb.BasicDBList;
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBObject;
+import com.mongodb.util.JSON;
+
+
+public class ReadingTest extends MongoDbTestBase {
+
+ @Test
+ public void simpleRead() throws Exception {
+ mdb.getCollection("test", "person")
+ .expectFindOne(dbObject("{'_id':'" + BSONPieces.simplePersonId + "'}"), cloneDbo(BSONPieces.simplePerson));
+
+ Person p = db.load(Person.class, BSONPieces.simplePersonId);
+
+ mdb.checkAll();
+ BSONPieces.assertSimplePersonLoaded(p);
+
+ assertThat(p.isFromDb(), equalTo(true));
+ assertThat(p.isOnDb(), equalTo(true));
+
+
+ // Loading an already cached element, should not call collection methods
+ Person p1 = db.load(Person.class, BSONPieces.simplePersonId);
+ assertThat(p1, sameInstance(p));
+ assertThat(p1.isFromDb(), equalTo(true));
+ assertThat(p1.isOnDb(), equalTo(true));
+ mdb.checkAll();
+ }
+
+ @Test
+ public void lazyLoadAnotherEntity() throws Exception {
+ mdb.getCollection("test", "person")
+ .expectFindOne(dbObject("{'_id':'" + BSONPieces.complexPersonId + "'}"), cloneDbo(BSONPieces.complexPerson));
+ Person p = db.load(Person.class, BSONPieces.complexPersonId);
+
+ mdb.checkAll();
+ BSONPieces.assertComplesPersonLoadedFields(p);
+
+ // try to load the friend, should load the simple person
+ mdb.getCollection("test", "person")
+ .expectFind(dbObject("{'_id': { '$in' : ['" + BSONPieces.simplePersonId + "']}}"), cloneDbo(BSONPieces.simplePerson));
+ Set<Person> friends = p.getFriends();
+
+ mdb.checkAll();
+ assertThat(friends, notNullValue());
+ assertThat(friends.size(), equalTo(1));
+ Person friend = friends.iterator().next();
+ BSONPieces.assertSimplePersonLoaded(friend);
+ assertThat(friend.isDirty(), equalTo(false));
+ assertThat(friend.isFromDb(), equalTo(true));
+ assertThat(friend.isOnDb(), equalTo(true));
+ }
+
+ @Test
+ public void checkFromDb() throws Exception {
+ mdb.getCollection("test", "person")
+ .expectFind(anything("Any query"), cloneDbo(BSONPieces.simplePerson));
+ List<Person> people = db.query(Person.class, "");
+
+ assertThat(people, notNullValue());
+ assertThat(people.size(), equalTo(1));
+ BSONPieces.assertSimplePersonLoaded(people.get(0));
+ assertThat(people.get(0).isFromDb(), equalTo(true));
+
+ mdb.getCollection("test", "person")
+ .expectFind(anything("Any query"), cloneDbo(BSONPieces.simplePerson));
+ Person p = db.named(Person.class, "simone");
+ BSONPieces.assertSimplePersonLoaded(p);
+ assertThat(p.isFromDb(), equalTo(true));
+
+ }
+
+ /*
+ @Test
+ public void replay() throws Exception {
+ DBObject dbo = cloneDbo(BSONPieces.simplePerson);
+ // {'__transactions': {'__transactionId' : '" + tr.getId() + "', 'name' :'Simone'} }
+ BasicDBList trlist = new BasicDBList();
+ BasicDBObject trobj = new BasicDBObject();
+ trobj.put("__transactionId", "123");
+ trobj.put("name", "Changed");
+ trlist.add(trobj);
+ dbo.put("__transactions", trlist);
+
+ mdb.getCollection("test", "person")
+ .expectFind(anything("Any query"), dbo);
+
+ List<Person> people = db.query(Person.class, "");
+
+ mdb.checkAll();
+
+ mdb.getCollection("test", "transactions")
+ .expectFindOne(dbObject("{'_id':'123'}"), (DBObject) JSON.parse(("{'_id':'123','state':'D','entities' : [{'col':'person','id':'" + BSONPieces.simplePersonId + "'}]}").replace('\'', '"')));
+
+ Person p = people.get(0);
+
+ assertThat(p.getName(), equalTo("Changed"));
+ }
+ */
+
+ @Test
+ public void polymorphicQuery() throws Exception {
+ MongoMetadata meta = db.getMeta();
+ meta.addClass(Person.class);
+ meta.addClass(ImportantPerson.class);
+
+ mdb.getCollection("test", "person")
+ .expectFind(dbObject("{'_jcl': {'$in' : ['Person','ImportantPerson'] }}"), BSONPieces.simplePerson);
+
+ db.query(Person.class, "");
+
+ mdb.checkAll();
+
+ mdb.getCollection("test", "person")
+ .expectFind(dbObject("{'_jcl': 'ImportantPerson'}"));
+
+ db.query(ImportantPerson.class, "");
+
+ mdb.checkAll();
+ }
+
+ @Test
+ public void readBoolean() throws Exception {
+ DBObject cloneDbo = cloneDbo(BSONPieces.simplePerson);
+ cloneDbo.put("isBool", true);
+ mdb.getCollection("test", "person")
+ .expectFindOne(dbObject("{'_id':'" + BSONPieces.simplePersonId + "'}"), cloneDbo);
+
+ Person p = db.load(Person.class, BSONPieces.simplePersonId);
+
+ mdb.checkAll();
+ BSONPieces.assertSimplePersonLoaded(p);
+ assertThat(p.getBool(), equalTo(true));
+
+ }
+
+ @Test
+ public void readEnum() throws Exception {
+ DBObject cloneDbo = cloneDbo(BSONPieces.simplePerson);
+ cloneDbo.put("type", "FRIENDLY");
+ mdb.getCollection("test", "person")
+ .expectFindOne(dbObject("{'_id':'" + BSONPieces.simplePersonId + "'}"), cloneDbo);
+
+ Person p = db.load(Person.class, BSONPieces.simplePersonId);
+
+ mdb.checkAll();
+ BSONPieces.assertSimplePersonLoaded(p);
+ assertThat(p.getType(), equalTo(PersonType.FRIENDLY));
+
+ }
+
+
+ // TODO test map loading
+
+ // TODO test lazy map loading
+
+ // TODO test reading of lazy one-to-one
+
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@labs.apache.org
For additional commands, e-mail: commits-help@labs.apache.org