You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2015/09/17 22:34:09 UTC
svn commit: r1703684 - in /uima/sandbox/uima-ducc/trunk: src/main/admin/
uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/
uima-ducc-database/src/main/java/org/apache/uima/ducc/database/
uima-ducc-parent/ uima-ducc-sm/src...
Author: challngr
Date: Thu Sep 17 20:34:08 2015
New Revision: 1703684
URL: http://svn.apache.org/viewvc?rev=1703684&view=rev
Log:
UIMA-4577 Scripting fixes. Fix leak in loader and speed it up. Add indexes. Minor tupos. Go to ODB 2.1.2
to pick up indexing fix.
Modified:
uima/sandbox/uima-ducc/trunk/src/main/admin/db_console
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/IStateServices.java
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbConstants.java
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbTester.java
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java
uima/sandbox/uima-ducc/trunk/uima-ducc-parent/pom.xml
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
Modified: uima/sandbox/uima-ducc/trunk/src/main/admin/db_console
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/admin/db_console?rev=1703684&r1=1703683&r2=1703684&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/src/main/admin/db_console (original)
+++ uima/sandbox/uima-ducc/trunk/src/main/admin/db_console Thu Sep 17 20:34:08 2015
@@ -38,8 +38,11 @@ class DbConsole(DuccUtil):
print "Note that Python must be at least version 2.6 but not 3.x. You are running version", sys.version_info
return
+ if ( self.db_parms == '--disabled--' ):
+ print "Database is disabled."
+ return
- (jvm_parms, classpath, db_rt, dburl, dbrest, dbroot) = self.db_parms()
+ (jvm_parms, classpath, db_rt, dburl, dbrest, dbroot) = self.db_parms
main = 'com.orientechnologies.orient.graph.console.OGremlinConsole'
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/IStateServices.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/IStateServices.java?rev=1703684&r1=1703683&r2=1703684&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/IStateServices.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/IStateServices.java Thu Sep 17 20:34:08 2015
@@ -31,6 +31,10 @@ public interface IStateServices {
public static String svc_reg_dir = IDuccEnv.DUCC_STATE_SVCREG_DIR;
public static String svc_hist_dir = IDuccEnv.DUCC_HISTORY_SVCREG_DIR;
+
+ public static final String archive_key = "is_archived";
+ public static final String archive_flag = "true";
+
public static final String svc = "svc";
public static final String meta = "meta";
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbConstants.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbConstants.java?rev=1703684&r1=1703683&r2=1703684&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbConstants.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbConstants.java Thu Sep 17 20:34:08 2015
@@ -38,6 +38,16 @@ public interface DbConstants
;
}
+ // Every vertex must inherit from here so we can use common indexes
+ public enum DuccVertexBase
+ implements Schema
+ {
+ VBase {
+ public String pname() { return "VDuccBase"; }
+ },
+ ;
+ }
+
public enum DbVertex
implements Schema
{
@@ -88,15 +98,25 @@ public interface DbConstants
}
+ public enum DuccEdgeBase
+ implements Schema
+ {
+ EdgeBase {
+ public String pname() { return "ducc_ebase"; }
+ },
+ ;
+ }
+
public enum DbEdge
implements Schema
{
//
// The convention is for edges to start with lower e and then a lower
//
- Edge { // Generic edge
- public String pname() { return "ducc_edge"; }
- },
+ // Edge { // Generic edge
+ // public String pname() { return "ducc_edge"; }
+ // },
+
Classpath { // All record types, detached classpath
public String pname() { return "eclasspath"; }
},
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java?rev=1703684&r1=1703683&r2=1703684&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java Thu Sep 17 20:34:08 2015
@@ -21,10 +21,12 @@ package org.apache.uima.ducc.database;
import org.apache.uima.ducc.database.DbConstants.DbEdge;
import org.apache.uima.ducc.database.DbConstants.DbVertex;
+import org.apache.uima.ducc.database.DbConstants.DuccVertexBase;
import com.orientechnologies.orient.client.remote.OServerAdmin;
import com.orientechnologies.orient.core.metadata.schema.OProperty;
import com.orientechnologies.orient.core.metadata.schema.OType;
+import com.orientechnologies.orient.core.sql.OCommandSQL;
import com.tinkerpop.blueprints.impls.orient.OrientEdgeType;
import com.tinkerpop.blueprints.impls.orient.OrientGraphFactory;
import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx;
@@ -33,6 +35,8 @@ import com.tinkerpop.blueprints.impls.or
public class DbCreate
{
String dburl;
+ String adminid = "root";
+ String adminpw = null;
OServerAdmin admin;
OrientGraphFactory factory;
@@ -41,6 +45,13 @@ public class DbCreate
this.dburl = dburl;
}
+ public DbCreate(String dburl, String adminid, String adminpw)
+ {
+ this.dburl = dburl;
+ this.adminid = adminid;
+ this.adminpw = adminpw;
+ }
+
void createEdgeType(OrientGraphNoTx g, DbEdge id)
{
String s = id.pname();
@@ -57,16 +68,35 @@ public class DbCreate
OrientVertexType e = g.getVertexType(s);
if ( e == null ) {
System.out.println("Create vertex " + s);
- e = g.createVertexType(s);
- OProperty p = e.createProperty(DbConstants.DUCCID, OType.LONG);
- p.setMandatory(true);
+ e = g.createVertexType(s, DuccVertexBase.VBase.pname());
}
}
void createSchema()
{
+ String methodName = "createSchema";
OrientGraphNoTx g = factory.getNoTx();
+ String base = DuccVertexBase.VBase.pname();
+ OrientVertexType e = g.getVertexType(base);
+ if ( e == null ) {
+ System.out.println("Create base vertex class " + base);
+ e = g.createVertexType(base);
+ OProperty p = e.createProperty(DbConstants.DUCCID, OType.LONG);
+ p.setMandatory(true);
+ OProperty p2 = e.createProperty(DbConstants.DUCC_DBCAT, OType.STRING);
+ p2.setMandatory(true);
+
+ String sql = "create index i_ducc_dbid on " + base + "(" + DbConstants.DUCCID + ") notunique";
+ g.command(new OCommandSQL(sql)).execute();
+ System.out.println("(sql)Created index i_ducc_dbid on class " + base + " for " + DbConstants.DUCCID);
+
+ sql = "create index i_ducc_dbcat on " + base + "(" + DbConstants.DUCC_DBCAT + ") notunique";
+ g.command(new OCommandSQL(sql)).execute();
+ System.out.println("(sql)Created index i_ducc_dbcat on class " + base + " for " + DbConstants.DUCC_DBCAT);
+
+ }
+
for ( DbVertex o : DbVertex.values() ) {
createVertexType(g, o);
}
@@ -77,6 +107,22 @@ public class DbCreate
g.shutdown();
}
+ boolean createPlocalDatabase()
+ throws Exception
+ {
+ boolean ret = false;
+ try {
+ factory = new OrientGraphFactory(dburl, "admin", "admin");
+ createSchema();
+ ret = true;
+ } catch ( Exception e ) {
+ e.printStackTrace();
+ } finally {
+ factory.close();
+ }
+ return ret;
+ }
+
/**
* Create the database and initialize the schema. This is intended to be called only from Main at
* system startup, to insure all users of the db have a db when they start.
@@ -84,11 +130,13 @@ public class DbCreate
boolean createDatabase()
throws Exception
{
- String pw = DbManager.dbPassword();
+ if ( adminpw == null ) {
+ adminpw = DbManager.dbPassword();
+ }
try {
admin = new OServerAdmin(dburl);
- admin.connect("root", pw); // connect to the server
+ admin.connect(adminid, adminpw); // connect to the server
if ( ! admin.existsDatabase("plocal") ) {
System.out.println("Database " + dburl + " does not exist, attempting to create it.");
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java?rev=1703684&r1=1703683&r2=1703684&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java Thu Sep 17 20:34:08 2015
@@ -429,10 +429,8 @@ public class DbHandle
OrientVertex ov = null;
// logger.info(methodName, null, duccid, "Create new db record of type", typeName);
- ov = graphDb.addVertex("class:" + typeName);
- ov.setProperties(DbConstants.DUCCID, duccid);
+ ov = graphDb.addVertex("class:" + typeName, DbConstants.DUCCID, duccid, DbConstants.DUCC_DBCAT, dbcat.pname());
ov.setProperties(props);
- ov.setProperty(DbConstants.DUCC_DBCAT, dbcat.pname());
return ov;
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java?rev=1703684&r1=1703683&r2=1703684&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java Thu Sep 17 20:34:08 2015
@@ -4,20 +4,27 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.ObjectInputStream;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.uima.ducc.common.Pair;
import org.apache.uima.ducc.common.persistence.services.IStateServices;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.database.DbConstants.DbCategory;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation;
import org.apache.uima.ducc.transport.event.common.IDuccWorkService;
+import com.orientechnologies.orient.core.config.OGlobalConfiguration;
+
/**
* Toy orientdb loader to load a historydb from ducc history
*/
@@ -30,9 +37,10 @@ public class DbLoader
StateServicesDb ssd = null;
// String history_url = "remote:localhost/DuccHistory";
- String state_url = "remote:localhost/DuccState";
+ // String state_url = "plocal:/home/challngr/ducc_runtime_db/database/databases/DuccHistoryT";
+ String state_url = "plocal:/users/challngr/DuccHistoryT";
- // String jobHistory = System.getProperty("user.home") + "/ducc_runtime/history/jobs";
+ // String jobHistory = System.getProperty("user.home") + "/ducc_runtime_db/history/jobs";
String jobHistory = "/home/ducc/ducc_runtime/history/jobs";
// String reservationHistory = System.getProperty("user.home") + "/ducc_runtime/history/reservations";
@@ -47,6 +55,10 @@ public class DbLoader
//String serviceRegistry = System.getProperty("user.home") + "/ducc_runtime/state/services";
String serviceRegistry = "/home/ducc/ducc_runtime/state/services";
+ String checkpointFile = "/home/ducc/ducc_runtime/state/orchestrator.ckpt";
+ String archive_key = IStateServices.archive_key;
+ String archive_flag = IStateServices.archive_flag;
+
int nthreads = 40;
AtomicInteger counter = new AtomicInteger(0);
@@ -65,16 +77,17 @@ public class DbLoader
public void loadJobs()
{
String methodName = "loadJobs";
- LinkedBlockingQueue<File> jobqueue = new LinkedBlockingQueue<File>();
+ LinkedBlockingQueue<File> queue = new LinkedBlockingQueue<File>();
- int max_to_load = Integer.MAX_VALUE; // or Integer.MAX_VALUE for 'all of them'
+ // int max_to_load = Integer.MAX_VALUE; // or Integer.MAX_VALUE for 'all of them'
+ int max_to_load = 1000; // or Integer.MAX_VALUE for 'all of them'
int nth = Math.min(nthreads, max_to_load);
JobLoader[] loader = new JobLoader[nth];
Thread[] threads = new Thread[nth];
List<Long> ids = new ArrayList<Long>();
for ( int i = 0; i < nth; i++ ) {
- loader[i] = new JobLoader(jobqueue, ids);
+ loader[i] = new JobLoader(queue, ids);
threads[i] = new Thread(loader[i]);
threads[i].start();
}
@@ -88,7 +101,7 @@ public class DbLoader
String s = f.toString();
if ( s.endsWith(".dwj") ) {
logger.info(methodName, null, "Loading file", c++, ":", f);
- jobqueue.offer(f);
+ queue.offer(f);
counter.getAndIncrement();
if ( c >= max_to_load ) break;
@@ -120,51 +133,35 @@ public class DbLoader
public void loadReservations()
{
String methodName = "loadReservations";
-
- LinkedBlockingQueue<IDuccWorkReservation> queue = new LinkedBlockingQueue<IDuccWorkReservation>();
+ LinkedBlockingQueue<File> queue = new LinkedBlockingQueue<File>();
- int max_to_load = Integer.MAX_VALUE;
+ //int max_to_load = Integer.MAX_VALUE;
+ int max_to_load = 1000;
int nth = Math.min(nthreads, max_to_load);
ReservationLoader[] loader = new ReservationLoader[nth];
Thread[] threads = new Thread[nth];
ArrayList<Long> ids = new ArrayList<Long>();
-
+
for ( int i = 0; i < nth; i++ ) {
loader[i] = new ReservationLoader(queue, ids);
threads[i] = new Thread(loader[i]);
threads[i].start();
}
-
+
File dir = new File(reservationHistory);
- int c = 0;
-
File[] files = dir.listFiles();
logger.info(methodName, null, "Reading", files.length, "reservation instances.");
- for ( File f : dir.listFiles() ) {
+
+ int c = 0;
+ for ( File f : files ) {
String s = f.toString();
if ( s.endsWith(".dwr") ) {
logger.info(methodName, null, "Loading file", c++, ":", f);
- IDuccWorkReservation res = null;
- FileInputStream fis = null;
- ObjectInputStream in = null;
+
+ queue.offer(f);
+ counter.getAndIncrement();
- try {
- long now = System.currentTimeMillis();
- fis = new FileInputStream(f);
- in = new ObjectInputStream(fis);
- res = (IDuccWorkReservation) in.readObject();
- logger.info(methodName, res.getDuccId(), "Time to read reservation:", System.currentTimeMillis() - now);
-
- queue.offer(res);
- counter.getAndIncrement();
- } catch(Exception e) {
- logger.info(methodName, null, e);
- } finally {
- // restoreJob(job.getDuccId().getFriendly());
- closeStream(in);
- closeStream(fis);
- if ( c >= max_to_load ) break;
- }
+ if ( c >= max_to_load ) break;
} else {
logger.info(methodName, null, "Can't find history file", f);
}
@@ -188,23 +185,16 @@ public class DbLoader
logger.info(methodName, null, "Joining thread (reservations).", i);
try { threads[i].join(); } catch ( InterruptedException e ) {}
}
-
- // try {
- // List<IDuccWorkReservation> ress = hmd.restoreReservations(c);
- // logger.info(methodName, null, "Recovered", ress.size(), "reservations.");
- // } catch (Exception e) {
- // // TODO Auto-generated catch block
- // e.printStackTrace();
- // }
}
public void loadServices()
{
String methodName = "loadServices";
- LinkedBlockingQueue<IDuccWorkService> queue = new LinkedBlockingQueue<IDuccWorkService>();
+ LinkedBlockingQueue<File> queue = new LinkedBlockingQueue<File>();
- int max_to_load = Integer.MAX_VALUE;
+ // int max_to_load = Integer.MAX_VALUE;
+ int max_to_load = 1000;
int nth = Math.min(nthreads, max_to_load);
ServiceLoader[] loader = new ServiceLoader[nth];
Thread[] threads = new Thread[nth];
@@ -217,36 +207,19 @@ public class DbLoader
}
File dir = new File(serviceHistory);
- int c = 0;
-
File[] files = dir.listFiles();
logger.info(methodName, null, "Reading", files.length, "service instances.");
+
+ int c = 0;
for ( File f : files ) {
String s = f.toString();
if ( s.endsWith(".dws") ) {
logger.info(methodName, null, "Loading file", c++, ":", f);
- IDuccWorkService svc = null;
- FileInputStream fis = null;
- ObjectInputStream in = null;
- try {
- long now = System.currentTimeMillis();
- fis = new FileInputStream(f);
- in = new ObjectInputStream(fis);
- svc = (IDuccWorkService) in.readObject();
- logger.info(methodName, svc.getDuccId(), "Time to read service:", System.currentTimeMillis() - now);
-
+ queue.offer(f);
+ counter.getAndIncrement();
- queue.offer(svc);
- counter.getAndIncrement();
- } catch(Exception e) {
- logger.info(methodName, null, e);
- } finally {
- // restoreJob(job.getDuccId().getFriendly());
- closeStream(in);
- closeStream(fis);
- if ( c >= max_to_load ) break;
- }
+ if ( c >= max_to_load ) break;
} else {
logger.info(methodName, null, "Can't find history file", f);
}
@@ -254,7 +227,7 @@ public class DbLoader
while ( (c = counter.get()) != 0 ) {
try {
- logger.info(methodName, null, "Waiting for service loads to finish, counter is", c);
+ logger.info(methodName, null, "Waiting for loads to finish, counter is", c, "(service instances");
Thread.sleep(1000);
}
catch ( Exception e ) {}
@@ -270,21 +243,13 @@ public class DbLoader
try { threads[i].join(); } catch ( InterruptedException e ) {}
}
- // try {
- // List<IDuccWorkService> services = hmd.restoreServices(c);
- // logger.info(methodName, null, "Recovered", services.size(), "serves.");
- // } catch (Exception e) {
- // // TODO Auto-generated catch block
- // e.printStackTrace();
- // }
-
}
- public void loadServiceRegistry(String registry)
+ public void loadServiceRegistry(String registry, boolean isHistory)
{
String methodName = "loadServiceRegistry";
- LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>();
+ LinkedBlockingQueue<Pair<String, Boolean>> queue = new LinkedBlockingQueue<Pair<String, Boolean>>();
int max_to_load = Integer.MAX_VALUE;
int nth = Math.min(nthreads, max_to_load);
@@ -307,7 +272,7 @@ public class DbLoader
if ( s.endsWith(".svc") ) {
int ndx = s.indexOf(".svc");
String numeric = s.substring(0, ndx);
- queue.offer(numeric);
+ queue.offer(new Pair<String, Boolean>(numeric, isHistory));
counter.getAndIncrement();
if ( ++c >= max_to_load ) break;
@@ -335,34 +300,78 @@ public class DbLoader
}
+ void loadCheckpoint()
+ throws Exception
+ {
+ String methodName = "loadCheckpoint";
+
+ //Checkpointable obj = null;
+ FileInputStream fis = null;
+ ObjectInputStream in = null;
+ try {
+ fis = new FileInputStream(checkpointFile);
+ in = new ObjectInputStream(fis);
+
+ Object xobj = (Object) in.readObject();
+ Class<?> cl = xobj.getClass();
+ Field p2jfield = cl.getDeclaredField("processToJobMap");
+ p2jfield.setAccessible(true);
+ ConcurrentHashMap<DuccId, DuccId> p2jmap = (ConcurrentHashMap<DuccId, DuccId>) p2jfield.get(xobj);
+
+ Field wmField = cl.getDeclaredField("workMap");
+ wmField.setAccessible(true);
+ DuccWorkMap workMap = (DuccWorkMap) wmField.get(xobj);
+
+ hmd.checkpoint(workMap, p2jmap);
+
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } finally {
+ fis.close();
+ in.close();
+ }
+
+ }
+
void run()
throws Exception
{
String methodName = "run";
DbCreate cr = new DbCreate(state_url);
- cr.createDatabase();
+ cr.createPlocalDatabase();
- // load the history db
+ logger.info(methodName, null, "storage.useWAL", System.getProperty("storage.useWAL"));
+ logger.info(methodName, null, "tx.useLog", System.getProperty("tx.useLog"));
if ( true ) {
try {
+ OGlobalConfiguration.dumpConfiguration(System.out);
+
hmd = new HistoryManagerDb(logger);
+ if ( true ) loadCheckpoint();
+
+ OGlobalConfiguration.USE_WAL.setValue(false);
+
+ OGlobalConfiguration.dumpConfiguration(System.out);
+
+
// ---------- Load job history
- loadJobs();
- if ( true ) return;
+ if ( true ) loadJobs();
// ---------- Load reservation history
- loadReservations();
-
+ if ( true ) loadReservations();
+
+
// ---------- Load service isntance and AP history
- loadServices();
+ if ( true ) loadServices();
// ---------- Load service registry
ssd = new StateServicesDb();
ssd.init(logger);
- loadServiceRegistry(serviceRegistry);
+ loadServiceRegistry(serviceRegistry, false);
try {
ssd.shutdown();
} catch ( Exception e ) {
@@ -372,7 +381,13 @@ public class DbLoader
// ---------- Load service registry history
ssd = new StateServicesDb();
ssd.init(logger);
- loadServiceRegistry(serviceRegistryHistory);
+ loadServiceRegistry(serviceRegistryHistory, true);
+
+ OGlobalConfiguration.USE_WAL.setValue(true);
+ if ( true ) loadCheckpoint();
+
+
+
} catch ( Exception e ) {
logger.error(methodName, null, e);
@@ -415,7 +430,6 @@ public class DbLoader
File f = null;
IDuccWorkJob job = null;
try {
- // logger.info(methodName, null, "About to take (job).");
f = queue.take();
FileInputStream fis = null;
@@ -430,7 +444,6 @@ public class DbLoader
} catch(Exception e) {
logger.info(methodName, null, e);
} finally {
- // restoreJob(job.getDuccId().getFriendly());
closeStream(in);
closeStream(fis);
}
@@ -454,9 +467,9 @@ public class DbLoader
class ServiceLoader
implements Runnable
{
- BlockingQueue<IDuccWorkService> queue;
+ BlockingQueue<File> queue;
List<Long> ids;
- ServiceLoader(BlockingQueue<IDuccWorkService> queue, List<Long> ids)
+ ServiceLoader(BlockingQueue<File> queue, List<Long> ids)
{
this.queue = queue;
this.ids = ids;
@@ -467,24 +480,36 @@ public class DbLoader
String methodName = "ServiceLoader.run";
while ( true ) {
IDuccWorkService svc = null;
+ File f = null;
try {
- logger.info(methodName, null, "About to take (service).");
- svc = queue.take();
+ f = queue.take();
+ FileInputStream fis = null;
+ ObjectInputStream in = null;
+
+ try {
+ long now = System.currentTimeMillis();
+ fis = new FileInputStream(f);
+ in = new ObjectInputStream(fis);
+ svc = (IDuccWorkService) in.readObject();
+ logger.info(methodName, svc.getDuccId(), "Time to read service:", System.currentTimeMillis() - now);
+ } catch(Exception e) {
+ logger.info(methodName, null, e);
+ } finally {
+ closeStream(in);
+ closeStream(fis);
+ }
+ hmd.saveServiceUnsafe(svc);
+
} catch ( InterruptedException e ) {
return;
- }
- logger.info(methodName, svc.getDuccId(), "Took a Service");
- try {
- //h = dbManager.open();
- hmd.saveServiceUnsafe(svc);
- //h.close();
- synchronized(ids) {
- ids.add(svc.getDuccId().getFriendly());
- }
- counter.getAndDecrement();
- } catch(Exception e) {
+ } catch ( Exception e ){
logger.info(methodName, null, e);
- }
+ }
+
+ synchronized(ids) {
+ ids.add(svc.getDuccId().getFriendly());
+ }
+ counter.getAndDecrement();
}
}
}
@@ -492,9 +517,9 @@ public class DbLoader
class ReservationLoader
implements Runnable
{
- BlockingQueue<IDuccWorkReservation> queue;
+ BlockingQueue<File> queue;
List<Long> ids;
- ReservationLoader(BlockingQueue<IDuccWorkReservation> queue, List<Long> ids)
+ ReservationLoader(BlockingQueue<File> queue, List<Long> ids)
{
this.queue = queue;
this.ids = ids;
@@ -505,24 +530,36 @@ public class DbLoader
String methodName = "ReservationLoader.run";
while ( true ) {
IDuccWorkReservation res = null;
+ File f = null;
try {
- logger.info(methodName, null, "About to take (reservation).");
- res = queue.take();
+ f = queue.take();
+ FileInputStream fis = null;
+ ObjectInputStream in = null;
+
+ try {
+ long now = System.currentTimeMillis();
+ fis = new FileInputStream(f);
+ in = new ObjectInputStream(fis);
+ res = (IDuccWorkReservation) in.readObject();
+ logger.info(methodName, res.getDuccId(), "Time to read reservation:", System.currentTimeMillis() - now);
+ } catch(Exception e) {
+ logger.info(methodName, null, e);
+ } finally {
+ closeStream(in);
+ closeStream(fis);
+ }
+ hmd.saveReservationUnsafe(res);
+
} catch ( InterruptedException e ) {
return;
- }
- logger.info(methodName, res.getDuccId(), "Took a Service");
- try {
- //h = dbManager.open();
- hmd.saveReservationUnsafe(res);
- //h.close();
- synchronized(ids) {
- ids.add(res.getDuccId().getFriendly());
- }
- counter.getAndDecrement();
- } catch(Exception e) {
+ } catch ( Exception e ){
logger.info(methodName, null, e);
- }
+ }
+
+ synchronized(ids) {
+ ids.add(res.getDuccId().getFriendly());
+ }
+ counter.getAndDecrement();
}
}
}
@@ -531,9 +568,9 @@ public class DbLoader
class ServiceRegistrationLoader
implements Runnable
{
- BlockingQueue<String> queue;
+ BlockingQueue<Pair<String, Boolean>> queue;
List<Long> ids;
- ServiceRegistrationLoader(BlockingQueue<String> queue, List<Long> ids)
+ ServiceRegistrationLoader(BlockingQueue<Pair<String, Boolean>> queue, List<Long> ids)
{
this.queue = queue;
this.ids = ids;
@@ -543,10 +580,14 @@ public class DbLoader
{
String methodName = "ServiceRegistrationLoader.run";
while ( true ) {
+ Pair<String, Boolean> p = null;
String id = null;
+ boolean isHistory;
try {
logger.info(methodName, null, "About to take (service id).");
- id = queue.take();
+ p = queue.take();
+ id = p.first();
+ isHistory = p.second();
} catch ( InterruptedException e ) {
return;
}
@@ -564,6 +605,10 @@ public class DbLoader
meta_in = new FileInputStream(meta_name);
svc_props.load(svc_in);
meta_props.load(meta_in);
+ if ( isHistory ) {
+ meta_props.setProperty(archive_key, archive_flag);
+ }
+
String sid = meta_props.getProperty(IStateServices.SvcProps.numeric_id.pname());
if ( sid == null ) {
logger.warn(methodName, null, "Cannot find service id in meta file for", id);
@@ -575,8 +620,11 @@ public class DbLoader
}
DuccId did = new DuccId(Long.parseLong(sid));
- ssd.storePropertiesUnsafe(did, svc_props, meta_props);
-
+ if ( isHistory ) {
+ ssd.storePropertiesUnsafe(did, svc_props, meta_props, DbCategory.History);
+ } else {
+ ssd.storePropertiesUnsafe(did, svc_props, meta_props, DbCategory.SmReg);
+ }
synchronized(ids) {
ids.add(did.getFriendly());
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbTester.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbTester.java?rev=1703684&r1=1703683&r2=1703684&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbTester.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbTester.java Thu Sep 17 20:34:08 2015
@@ -14,6 +14,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.uima.ducc.common.persistence.services.IStateServices;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.database.DbConstants.DbCategory;
import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation;
import org.apache.uima.ducc.transport.event.common.IDuccWorkService;
@@ -570,7 +571,7 @@ public class DbTester
}
DuccId did = new DuccId(Long.parseLong(sid));
- ssd.storePropertiesUnsafe(did, svc_props, meta_props);
+ ssd.storePropertiesUnsafe(did, svc_props, meta_props, DbCategory.SmReg);
synchronized(ids) {
ids.add(did.getFriendly());
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java?rev=1703684&r1=1703683&r2=1703684&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java Thu Sep 17 20:34:08 2015
@@ -269,7 +269,7 @@ public class HistoryManagerDb
if ( safe ) {
h = dbManager.open();
} else {
- h = dbManager.open();
+ h = dbManager.openNoTx();
}
if ( safe && h.thingInDatabase(id, type, dbcat) ) {
logger.warn(methodName, j.getDuccId(), "Not overwriting saved job.");
@@ -289,6 +289,7 @@ public class HistoryManagerDb
throw e;
} finally {
h.commit();
+ h.close();
}
}
@@ -377,7 +378,7 @@ public class HistoryManagerDb
try {
h = dbManager.open();
Iterable<Vertex> q = h.select("SELECT * FROM " + DbVertex.Job.pname() + " WHERE ducc_dbid=" + friendly_id +
- " AND " + DbConstants.DUCC_DBCAT + "='" + DbCategory.History.pname());
+ " AND " + DbConstants.DUCC_DBCAT + "='" + DbCategory.History.pname() + "'");
for ( Vertex v : q ) {
// There's only 1 unless db is broken.
return restoreJobInternal(h, (OrientVertex) v);
@@ -448,7 +449,7 @@ public class HistoryManagerDb
Gson g = mkGsonForJob();
String dbres = g.toJson(r);
- logger.info(methodName, null, "------------------- Reservation JSON: " + dbres);
+ // logger.info(methodName, null, "------------------- Reservation JSON: " + dbres);
// Must repair these things because OR continues to use the job after it has been
// written to history.
@@ -481,7 +482,11 @@ public class HistoryManagerDb
Long id = r.getDuccId().getFriendly();
DbHandle h = null;
try {
- h = dbManager.open();
+ if ( safe ) {
+ h = dbManager.open();
+ } else {
+ h = dbManager.openNoTx();
+ }
if ( safe && h.thingInDatabase(id, DbVertex.Reservation, dbcat) ) {
h.close();
return;
@@ -528,7 +533,7 @@ public class HistoryManagerDb
JsonObject jo = mkJsonObject(json);
Gson g = mkGsonForJob();
- logger.info(methodName, null, g.toJson(jo));
+ // logger.info(methodName, null, g.toJson(jo));
r = g.fromJson(jo, DuccWorkReservation.class);
@@ -536,9 +541,9 @@ public class HistoryManagerDb
if ( l != null ) {
for (JdReservationBean b : l ) {
ConcurrentHashMap<DuccId, SizeBytes> map = b.getMap();
- for ( DuccId k : map.keySet() ) {
- logger.info(methodName, null, "REST ===> " + k.getFriendly() + " " + k.getUnique() + " : " + map.get(k));
- }
+ //for ( DuccId k : map.keySet() ) {
+ // logger.info(methodName, null, "REST ===> " + k.getFriendly() + " " + k.getUnique() + " : " + map.get(k));
+ //}
}
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java?rev=1703684&r1=1703683&r2=1703684&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java Thu Sep 17 20:34:08 2015
@@ -164,23 +164,27 @@ public class StateServicesDb
* equivalent in the Java interface? If so, we should modify this to use it
* and can then eliminate the 'safe' flag.
*/
- boolean storePropertiesInternal (DuccId serviceId, Properties svc_props, Properties meta_props, boolean safe)
+ boolean storePropertiesInternal (DuccId serviceId, Properties svc_props, Properties meta_props, boolean safe, DbCategory category)
{
String methodName = "storePropertiesInternal";
DbHandle h = null;
try {
- h = dbManager.open();
+ if ( safe ) {
+ h = dbManager.open();
+ } else {
+ h = dbManager.openNoTx();
+ }
Long id = serviceId.getFriendly();
if ( safe ) {
- if ( h.thingInDatabase(id, DbVertex.ServiceReg, DbCategory.SmReg) ) {
+ if ( h.thingInDatabase(id, DbVertex.ServiceReg, category) ) {
return false;
}
}
- h.createPropertiesObject(svc_props, DbVertex.ServiceReg, id, DbCategory.SmReg);
- h.createPropertiesObject(meta_props, DbVertex.ServiceMeta, id, DbCategory.SmReg);
+ h.createPropertiesObject(svc_props, DbVertex.ServiceReg, id, category);
+ h.createPropertiesObject(meta_props, DbVertex.ServiceMeta, id, category);
h.commit();
return true;
} catch ( Exception e ) {
@@ -196,9 +200,9 @@ public class StateServicesDb
* Save the props into the database, don't check to see if they're there already. Used by the
* loader for converting old registries to the db.
*/
- public boolean storePropertiesUnsafe(DuccId serviceId, Properties svc_props, Properties meta_props)
+ public boolean storePropertiesUnsafe(DuccId serviceId, Properties svc_props, Properties meta_props, DbCategory category)
{
- return storePropertiesInternal(serviceId, svc_props, meta_props, false);
+ return storePropertiesInternal(serviceId, svc_props, meta_props, false, category);
}
/**
@@ -209,7 +213,7 @@ public class StateServicesDb
*/
public boolean storeProperties(DuccId serviceId, Properties svc_props, Properties meta_props)
{
- return storePropertiesInternal(serviceId, svc_props, meta_props, true);
+ return storePropertiesInternal(serviceId, svc_props, meta_props, true, DbCategory.SmReg);
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-parent/pom.xml
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-parent/pom.xml?rev=1703684&r1=1703683&r2=1703684&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-parent/pom.xml (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-parent/pom.xml Thu Sep 17 20:34:08 2015
@@ -165,7 +165,7 @@
<jetty.version>7.4.4.v20110707</jetty.version>
<orbit-org-apache-jasper.version>2.1.0.v201110031002</orbit-org-apache-jasper.version>
<servlet-api.version>2.5</servlet-api.version>
- <orientdb.version>2.1.0</orientdb.version>
+ <orientdb.version>2.1.2</orientdb.version>
<orientdb.studio.version>2.0-M3</orientdb.studio.version>
<http.commons.client.version>4.3.5</http.commons.client.version>
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java?rev=1703684&r1=1703683&r2=1703684&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java Thu Sep 17 20:34:08 2015
@@ -163,8 +163,8 @@ public class ServiceSet
String[] coOwners = null;
- static final String archive_key = "is_archived";
- static final String archive_flag = "true";
+ String archive_key = IStateServices.archive_key;
+ String archive_flag = IStateServices.archive_flag;
//
// Constructor for a registered service