You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2015/10/12 18:10:55 UTC
svn commit: r1708149 [1/2] - in /uima/sandbox/uima-ducc/trunk:
src/main/admin/ src/main/resources/
uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/monitor/
uima-ducc-common/src/main/java/org/apache/uima/ducc/common/
uima-ducc-common/src/main/j...
Author: challngr
Date: Mon Oct 12 16:10:55 2015
New Revision: 1708149
URL: http://svn.apache.org/viewvc?rev=1708149&view=rev
Log:
UIMA-4577 RM node state persistence. Loader updates. Service reg updates.
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/RmPersistenceFactory.java
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbListLoader.java
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java
Modified:
uima/sandbox/uima-ducc/trunk/src/main/admin/ducc.py
uima/sandbox/uima-ducc/trunk/src/main/admin/ducc_util.py
uima/sandbox/uima-ducc/trunk/src/main/resources/default.ducc.properties
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/monitor/AgentMonitor.java
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/ANodeStability.java
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/IStateServices.java
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/NullStateServices.java
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/StateServices.java
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbConstants.java
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/misc/SmLoader.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/event/ResourceManagerEventListener.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/event/ServiceManagerEventListener.java
uima/sandbox/uima-ducc/trunk/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/nodeviz/NodeViz.java
Modified: uima/sandbox/uima-ducc/trunk/src/main/admin/ducc.py
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/admin/ducc.py?rev=1708149&r1=1708148&r2=1708149&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/src/main/admin/ducc.py (original)
+++ uima/sandbox/uima-ducc/trunk/src/main/admin/ducc.py Mon Oct 12 16:10:55 2015
@@ -66,6 +66,10 @@ class Ducc(DuccUtil):
else:
jp = jp + k + '=' + v + ' '
+
+ if ( self.db_jvm_args != None ):
+ jp = jp + ' ' + self.db_jvm_args
+
cmd = ' '.join(['nohup', self.java(), jp, '-cp', classpath, main, '&'])
print cmd
@@ -177,6 +181,7 @@ class Ducc(DuccUtil):
jvm_opts.append("-Dducc.rm.override.dram=" + rmoverride)
if ( self.rm_jvm_args != None ):
jvm_opts.append(self.rm_jvm_args)
+ self.add_to_classpath(ducc_home + '/lib/orientdb/*')
if ( c == 'ws' ):
#see if the ws jsp compilation directory is specified
@@ -251,7 +256,7 @@ class Ducc(DuccUtil):
if ( args != None ):
cmd.append(args)
- #print 'CMD', cmd
+ print 'CMD', cmd
if ( pid == None ):
if ( background ):
Modified: uima/sandbox/uima-ducc/trunk/src/main/admin/ducc_util.py
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/admin/ducc_util.py?rev=1708149&r1=1708148&r2=1708149&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/src/main/admin/ducc_util.py (original)
+++ uima/sandbox/uima-ducc/trunk/src/main/admin/ducc_util.py Mon Oct 12 16:10:55 2015
@@ -183,6 +183,7 @@ class DuccUtil(DuccBase):
rt = self.DUCC_HOME # (ducc runtime)
db_rt = rt + '/database' # ORIENTDB_HOME - the database "home" place
+
jvm_parms = {
'-Dfile.encoding' : 'UTF8',
'-Drhino.opt.level' : '9',
@@ -192,6 +193,10 @@ class DuccUtil(DuccBase):
'-Dorientdb.config.file' : rt + '/' + dbconfig,
'-Dorientdb.www.path' : db_rt + '/www',
'-Djava.util.logging.config.file' : rt + '/resources/database.log.config',
+ '-Dcom.sun.management.jmxremote' : None,
+ '-Dcom.sun.management.jmxremote.ssl' : 'false',
+ '-Dcom.sun.management.jmxremote.authenticate': 'false',
+ '-Dcom.sun.management.jmxremote.port': '1098',
}
classpath = '"' + rt + '/lib/orientdb/*'
classpath = classpath + ':' + rt + '/lib/jna/*' + '"'
@@ -261,7 +266,7 @@ class DuccUtil(DuccBase):
line = lines.readline().strip()
except:
break
- #print '[]', line
+ print '[]', line
if ( not line ):
break
@@ -399,7 +404,7 @@ class DuccUtil(DuccBase):
def nohup(self, cmd, showpid=True):
cmd = ' '.join(cmd)
- # print '**** nohup', cmd, '****'
+ print '**** nohup', cmd, '****'
devnw = open(os.devnull, 'w')
devnr = open(os.devnull, 'r')
ducc = subprocess.Popen(cmd, shell=True, stdin=devnr, stdout=devnw, stderr=devnw)
@@ -412,7 +417,7 @@ class DuccUtil(DuccBase):
def ssh(self, host, do_wait, *CMD):
cmd = ' '.join(CMD)
- #print 'ssh -o BatchMode=yes -o ConnectTimeout=10', host, cmd
+ print 'ssh -o BatchMode=yes -o ConnectTimeout=10', host, cmd
if ( do_wait ):
return self.popen('ssh -q -o BatchMode=yes -o ConnectTimeout=10', host, cmd)
else:
Modified: uima/sandbox/uima-ducc/trunk/src/main/resources/default.ducc.properties
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/resources/default.ducc.properties?rev=1708149&r1=1708148&r2=1708149&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/src/main/resources/default.ducc.properties (original)
+++ uima/sandbox/uima-ducc/trunk/src/main/resources/default.ducc.properties Mon Oct 12 16:10:55 2015
@@ -152,6 +152,7 @@ ducc.rm.jvm.args = -Xmx1G
ducc.pm.jvm.args = -Xmx1G
ducc.sm.jvm.args = -Xmx1G
ducc.ws.jvm.args = -Xmx2G -Djava.util.Arrays.useLegacyMergeSort=true
+ducc.database.jvm.args = -Xmx8G -Dstorage.useWAL=false -Dstorage.diskCache.bufferSize=8192 -Dtx.useLog=false
# ========== General Configuration block ==========
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/monitor/AgentMonitor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/monitor/AgentMonitor.java?rev=1708149&r1=1708148&r2=1708149&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/monitor/AgentMonitor.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/monitor/AgentMonitor.java Mon Oct 12 16:10:55 2015
@@ -52,4 +52,7 @@ public class AgentMonitor extends ANodeS
public void ping(Node node) {
super.nodeArrives(node);
}
+
+ public void nodeRecovers(Node n) {}
+
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/ANodeStability.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/ANodeStability.java?rev=1708149&r1=1708148&r2=1708149&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/ANodeStability.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/ANodeStability.java Mon Oct 12 16:10:55 2015
@@ -61,6 +61,9 @@ public abstract class ANodeStability
*/
public synchronized void nodeArrives(Node n)
{
+ if ( heartbeats.containsKey(n) && (heartbeats.get(n) > 1) ) {
+ nodeRecovers(n);
+ }
heartbeats.put(n, 0);
}
@@ -71,6 +74,11 @@ public abstract class ANodeStability
public abstract void missedNode(Node n, int c);
/**
+ * Called when removing a node had missed heartbeats but has recovered.
+ */
+ public abstract void nodeRecovers(Node n);
+
+ /**
* Graceful shutdown of the thread.
*/
public synchronized void shutdown()
@@ -103,11 +111,11 @@ public abstract class ANodeStability
}
if ( deadNodes.size() > 0 ) {
- nodeDeath(deadNodes); // tell implementors
- for ( Node n : deadNodes.keySet() ) { // clear from list of known nodes
- heartbeats.remove(n); // so we don't keep harassing implementors
+ nodeDeath(deadNodes); // tell implementors
+ for ( Node n : deadNodes.keySet() ) { // clear from list of known nodes
+ heartbeats.remove(n); // so we don't keep harassing implementors
}
- deadNodes.clear(); // and clear our own list
+ deadNodes.clear(); // and clear our own list
}
try {
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java?rev=1708149&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java Mon Oct 12 16:10:55 2015
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+package org.apache.uima.ducc.common.persistence.rm;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+
+public interface IRmPersistence
+{
+ /**
+ * Establish a logger and anything else the persistence may need.
+ *
+ * @param logger This is the logger to be used. It is usually
+ * the same logger as the client of persistence, e.g.
+ * org.apache.uima.ducc.rm. The implementor is required
+ * to adjust itself to use this logger to insure
+ * messages are logged into the right log.
+ */
+ public void init(DuccLogger logger) throws Exception;
+
+ /**
+ * When RM performs its configuration it must call this to clear the db of existing
+ * nodes. As nodes rejoin they'll be added back. This is consistent with the RM's
+ * internal management, which also drops it's structures and rebuilds them on (re)configuration.
+ */
+ public void clear() throws Exception;
+
+ /**
+ * Set multiple properties in one swell foop.
+ *
+ * @param id This is the primary key, the machine name;
+ * @param properties These are the props, must be presented in the form of (String, Object) ...
+ */
+ public void setProperties(Object dbid, String id, Object... properties) throws Exception;
+
+ /**
+ * Set a property on an object. If the property cannot be set the action
+ * is rolled back an the underlying store is unchanged.
+ *
+ * @param id This is the primary key and is usually the name of a host.
+ * @param key This is the property key.
+ * @param value This is the value to set.
+ *
+ * @throws Exception. Anything that goes wrong throws. Usually the
+ * throw will originate in the DB because of some DB issue. An
+ * exception causes the action to be rolled back.
+ */
+ public void setProperty(Object dbid, String id, RmPropName key, Object value) throws Exception;
+
+
+ /**
+ * Write full information about a mach9ne into the DB. We assume the machine
+ * does not exist but in case it does, it is fully deleted first, and then
+ * re-saved. If the recored cannot be saved the action
+ * is rolled back an the underlying store is unchanged.
+ *
+ * @param id This is the primary key and is usually the name of a host.
+ * @param props This is the full set of properties to be set.
+ *
+ * @throws Exception. Anything that goes wrong throws. Usually the
+ * throw will originate in the DB because of some DB issue. An
+ * exception causes the action to be rolled back.
+ *
+ * @return The db id of the created machine.
+ */
+ public Object createMachine(String id, Properties props) throws Exception;
+
+ /**
+ * Fetch a machine by its id.
+ *
+ * @param id This is the name of a specific machine and must exactly
+ * match the name of a machine in the DB.
+ *
+ * @return A properties object containing full details about the machine, or
+ * null if no such machine exists.
+ *
+ * @throws Exception. Anything that goes wrong throws. Usually the
+ * throw will originate in the DB because of some DB issue.
+ */
+ public Properties getMachine(String id) throws Exception;
+
+ /**
+ * Fetch all machines in the database.
+ *
+ * @return A map of properties objects containing full details about the machines,
+ * keyed on machine name. If there are no machines found in the db,
+ * an empty map is returned.
+ *
+ * @throws Exception. Anything that goes wrong throws. Usually the
+ * throw will originate in the DB because of some DB issue.
+ */
+ public Map<String, Properties> getAllMachines() throws Exception;
+
+ public String toGson(Object o);
+
+ public interface RmProps
+ {
+ String pname();
+ }
+
+ enum RmPropName
+ implements RmProps
+ {
+ Name {
+ public String pname() { return "name"; }
+ },
+ Responsive{
+ public String pname() { return "responsive"; }
+ },
+ Online{
+ public String pname() { return "online"; }
+ },
+ HeartBeats {
+ public String pname() { return "heartbeats"; }
+ },
+ Ip {
+ public String pname() { return "ip"; }
+ },
+ Nodepool {
+ public String pname() { return "nodepool"; }
+ },
+ Quantum {
+ public String pname() { return "quantum"; }
+ },
+ Memory {
+ public String pname() { return "memory"; }
+ },
+ ShareOrder {
+ public String pname() { return "share_order"; }
+ },
+ Shares{
+ public String pname() { return "shares"; }
+ },
+ Blacklisted {
+ public String pname() { return "blacklisted"; }
+ },
+ Heartbeats {
+ public String pname() { return "heartbeats"; }
+ },
+ SharesLeft {
+ public String pname() { return "shares_left"; }
+ },
+ Assignments {
+ public String pname() { return "assignments"; }
+ },
+ ;
+ }
+}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java?rev=1708149&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java Mon Oct 12 16:10:55 2015
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.common.persistence.rm;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+
+
+public class NullRmStatePersistence implements IRmPersistence
+{
+
+ NullRmStatePersistence() {
+ }
+
+ public void init(DuccLogger logger) throws Exception { }
+ public void clear() {}
+ public void setProperty(Object dbid, String id, RmPropName key, Object value) { }
+ public void setProperties(Object dbid, String id, Object... props) {}
+ public Object createMachine(String id, Properties props) { return new Integer(1);}
+ public Properties getMachine(String id) { return null; }
+ public Map<String, Properties> getAllMachines() { return new HashMap<String, Properties>(); }
+ public String toGson(Object o) { return ""; }
+
+}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/RmPersistenceFactory.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/RmPersistenceFactory.java?rev=1708149&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/RmPersistenceFactory.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/RmPersistenceFactory.java Mon Oct 12 16:10:55 2015
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.common.persistence.rm;
+
+import org.apache.uima.ducc.common.main.DuccService;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+
+
+
+public class RmPersistenceFactory
+{
+ private static IRmPersistence instance = null;
+
+ private static IRmPersistence getInstanceInternal(String callerClass, String component)
+ {
+ String methodName = "getInstance";
+ // log4j logging annoyance. We require the caller to give us its base package so
+ // we can configure a logger that writes to the right appender
+ // log4j logging annoyance. We require the caller to give us its base package so
+ // we can configure a logger that writes to the right appender
+ int ndx = callerClass.lastIndexOf(".");
+ String stem = callerClass.substring(0, ndx);
+
+ String clname = System.getProperty("ducc.rm.persistence.impl");
+ if ( clname == null ) {
+ DuccLogger logger = DuccService.getDuccLogger();
+ logger.warn(methodName, null, "RM persistence manager is not configured. Returning null instance.");
+ return new NullRmStatePersistence();
+ }
+ ndx = clname.lastIndexOf(".");
+ String clfile = clname.substring(ndx+1);
+ //
+ // We try to construct the persistence object. If it fails, we return a
+ // "null" object conforming to the interface but doing nothing to hopefully
+ // reduce NPEs.
+ //
+ DuccLogger logger = DuccLogger.getLogger(stem + "." + clfile, "DB"); // get the component logger
+
+ IRmPersistence ret = null;
+ try {
+ @SuppressWarnings("unchecked")
+ Class<IRmPersistence> iss = (Class<IRmPersistence>) Class.forName(clname);
+ ret = (IRmPersistence) iss.newInstance();
+ ret.init(logger);
+ } catch ( Throwable t ) {
+ logger.error(methodName, null, "Cannot instantiate RM persistence class", clname, ":", t);
+ ret = new NullRmStatePersistence();
+ }
+
+ return ret;
+ }
+
+ public static IRmPersistence getInstance(String callerClass, String component)
+ {
+ synchronized(RmPersistenceFactory.class) {
+ if ( instance == null ) {
+ instance = getInstanceInternal(callerClass, component);
+ }
+
+ return instance;
+ }
+ }
+
+}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/IStateServices.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/IStateServices.java?rev=1708149&r1=1708148&r2=1708149&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/IStateServices.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/IStateServices.java Mon Oct 12 16:10:55 2015
@@ -49,6 +49,8 @@ public interface IStateServices {
implementors { public String pname() { return "implementors"; } },
numeric_id { public String pname() { return "numeric_id"; } },
+ uuid { public String pname() { return "uuid"; } },
+ service_seqno { public String pname() { return "service.seqno"; } },
ping_active { public String pname() { return "ping_active"; } },
ping_only { public String pname() { return "ping_only"; } },
@@ -79,6 +81,7 @@ public interface IStateServices {
last_runnable_readable { public String pname() { return "last_runnable_readable"; } },
work_instances { public String pname() { return "work_instances"; } },
registration_date { public String pname() { return "registration_date"; } },
+ registration_date_millis { public String pname() { return "registration_date_millis"; } },
instance_init_failures_limit { public String pname() { return "instance_init_failures_limit"; } },
@@ -140,8 +143,8 @@ public interface IStateServices {
public StateServicesDirectory getStateServicesDirectory() throws Exception; // all the registy in one blow
public boolean storeProperties (DuccId serviceId, Properties svc, Properties meta) throws Exception; // save svc and meta in a transaction
- public boolean updateJobProperties (DuccId serviceId, Properties props) throws Exception; // update just job props
- public boolean updateMetaProperties(DuccId serviceId, Properties props) throws Exception; // update just metaprops
+ public boolean updateJobProperties (Object dbid, DuccId serviceId, Properties props) throws Exception; // update just job props
+ public boolean updateMetaProperties(Object dbid, DuccId serviceId, Properties props) throws Exception; // update just metaprops
public void moveToHistory(DuccId serviceId, Properties svc, Properties meta) throws Exception;
public void shutdown() throws Exception;
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/NullStateServices.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/NullStateServices.java?rev=1708149&r1=1708148&r2=1708149&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/NullStateServices.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/NullStateServices.java Mon Oct 12 16:10:55 2015
@@ -54,12 +54,12 @@ public class NullStateServices implement
}
public boolean storeProperties(DuccId serviceId, Properties svc, Properties meta) throws Exception {return false;}
- public boolean updateProperties(DuccId serviceId, String type, Properties props) throws Exception {return false;}
+ public boolean updateProperties(Object dbid, DuccId serviceId, String type, Properties props) throws Exception {return false;}
public void deleteProperties(DuccId serviceId) throws Exception {}
public void shutdown() throws Exception {}
public void moveToHHistory() throws Exception {}
- public boolean updateJobProperties(DuccId serviceId, Properties props) throws Exception {return false;}
- public boolean updateMetaProperties(DuccId serviceId, Properties props) throws Exception {return false;}
+ public boolean updateJobProperties(Object dbid, DuccId serviceId, Properties props) throws Exception {return false;}
+ public boolean updateMetaProperties(Object dbid, DuccId serviceId, Properties props) throws Exception {return false;}
public void moveToHistory(DuccId serviceId, Properties svc, Properties meta) throws Exception {}
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/StateServices.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/StateServices.java?rev=1708149&r1=1708148&r2=1708149&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/StateServices.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/StateServices.java Mon Oct 12 16:10:55 2015
@@ -237,21 +237,21 @@ public class StateServices implements IS
return ok;
}
- private boolean updateProperties(DuccId serviceId, Properties props, String type)
+ private boolean updateProperties(Object dbid, DuccId serviceId, Properties props, String type)
{
File f = new File(mkfilename(serviceId, type));
File tmpf = new File(f.toString() + ".tmp");
return saveProperties(serviceId, props, f, tmpf, type);
}
- public boolean updateJobProperties(DuccId serviceId, Properties props)
+ public boolean updateJobProperties(Object dbid, DuccId serviceId, Properties props)
{
- return updateProperties(serviceId, props, svc);
+ return updateProperties(dbid, serviceId, props, svc);
}
- public boolean updateMetaProperties(DuccId serviceId, Properties props)
+ public boolean updateMetaProperties(Object dbid, DuccId serviceId, Properties props)
{
- return updateProperties(serviceId, props, meta);
+ return updateProperties(dbid, serviceId, props, meta);
}
public void deleteProperties(long serviceId)
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbConstants.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbConstants.java?rev=1708149&r1=1708148&r2=1708149&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbConstants.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbConstants.java Mon Oct 12 16:10:55 2015
@@ -19,6 +19,8 @@
package org.apache.uima.ducc.database;
+import com.orientechnologies.orient.core.metadata.schema.OType;
+
/**
* This enum defines the classes and constants used in the db schema
*/
@@ -26,16 +28,51 @@ package org.apache.uima.ducc.database;
public interface DbConstants
{
- static final String DUCCID = "ducc_dbid"; // DB-unique name for the duccid
- static final String DUCC_DBCAT = "ducc_dbcat"; // The ducc database category: history, checkpoint, sm registry
+ static final String DUCCID = "ducc_dbid"; // DB-unique name for the duccid
+ static final String DUCC_DBCAT = "ducc_dbcat"; // The ducc database category: history, checkpoint, sm registry
+ static final String DUCC_DBNODE = "ducc_dbnode";
+
+ // for all vertices we need to know the base and the name
+ public interface VSchema
+ {
+ String pname(); // the name of the ODB class
+ VSchema parent(); // parent class, if any. if none, then V
+ Index[] indices(); // indices to define on the class
+ }
- public interface Schema
+ // for all indices we need to know the name, the base class, the property, and the type
+ public interface Index
{
- String pname();
+ String pname(); // index name
+ String propname(); // name of the property it is applied to,, must exist in base
+ OType type(); // datatype
+ }
+
+ public enum DuccIndex
+ implements Index
+ {
+ IDuccId {
+ public String pname() { return "i_duccid"; }
+ public String propname() { return DUCCID; }
+ public OType type() { return OType.LONG; }
+ },
+
+ ICategory {
+ public String pname() { return "i_category"; }
+ public String propname() { return DUCC_DBCAT; }
+ public OType type() { return OType.STRING; }
+ },
+
+ INodeName {
+ public String pname() { return "i_nodename"; }
+ public String propname() { return DUCC_DBNODE; }
+ public OType type() { return OType.STRING; }
+ },
+ ;
+
}
public enum DbCategory
- implements Schema
{
Any {
// All categories - don't qualify the search
@@ -53,27 +90,40 @@ public interface DbConstants
// Active service registration
public String pname() { return "smreg"; }
},
-
- ;
+ RmState {
+ // RM transient state. Nodes, shares, etc.
+ public String pname() { return "rmstate"; }
+ },
+ ;
+ public abstract String pname();
+
}
- // Every vertex must inherit from here so we can use common indexes
- public enum DuccVertexBase
- implements Schema
+ public enum DbVertex
+ implements VSchema
{
VBase {
- public String pname() { return "VDuccBase"; }
+ public String pname() { return "VBase"; }
+ public DbVertex parent() { return null; }
+ public Index[] indices() { return new Index[] { DuccIndex.ICategory }; }
+ },
+
+ VWork {
+ public String pname() { return "VWork"; }
+ public DbVertex parent() { return VBase; }
+ public Index[] indices() { return new Index[] { DuccIndex.IDuccId } ; }
+ },
+
+ RmNode {
+ public String pname() { return "VRmNode"; }
+ public DbVertex parent() { return VBase; }
+ public Index[] indices() { return new Index[] { DuccIndex.INodeName }; }
},
- ;
- }
- public enum DbVertex
- implements Schema
- {
//
// The convention is for vertices to start with Capital V and then a Capital
//
- Job { // The serialized job instance from OR
+ Job {
public String pname() { return "VJob"; }
},
@@ -112,22 +162,20 @@ public interface DbConstants
ProcessToJob { // For checkpoints, the process - to - job id map
public String pname() { return "VProcessToJob"; }
},
-
;
+ public DbVertex parent() { return VWork; }
+ public Index[] indices() { return null; }
}
- public enum DuccEdgeBase
- implements Schema
+ public interface ESchema
{
- EdgeBase {
- public String pname() { return "ducc_ebase"; }
- },
- ;
+ String pname(); // the name of the ODB class
+ ESchema parent();
}
public enum DbEdge
- implements Schema
+ implements ESchema
{
//
// The convention is for edges to start with lower e and then a lower
@@ -136,6 +184,11 @@ public interface DbConstants
// public String pname() { return "ducc_edge"; }
// },
+ EBase {
+ public String pname() { return "ducc_ebase"; }
+ public ESchema parent() { return null; }
+ },
+
Classpath { // All record types, detached classpath
public String pname() { return "eclasspath"; }
},
@@ -154,5 +207,7 @@ public interface DbConstants
;
+ public ESchema parent() { return EBase; }
+
}
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java?rev=1708149&r1=1708148&r2=1708149&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java Mon Oct 12 16:10:55 2015
@@ -22,11 +22,10 @@ package org.apache.uima.ducc.database;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.database.DbConstants.DbEdge;
import org.apache.uima.ducc.database.DbConstants.DbVertex;
-import org.apache.uima.ducc.database.DbConstants.DuccVertexBase;
+import org.apache.uima.ducc.database.DbConstants.Index;
import com.orientechnologies.orient.client.remote.OServerAdmin;
import com.orientechnologies.orient.core.metadata.schema.OProperty;
-import com.orientechnologies.orient.core.metadata.schema.OType;
import com.orientechnologies.orient.core.sql.OCommandSQL;
import com.tinkerpop.blueprints.impls.orient.OrientEdgeType;
import com.tinkerpop.blueprints.impls.orient.OrientGraphFactory;
@@ -94,51 +93,134 @@ public class DbCreate
}
}
- void createVertexType(OrientGraphNoTx g, DbVertex id)
- {
- String methodName = "createVertexType";
- String s = id.pname();
- OrientVertexType e = g.getVertexType(s);
- if ( e == null ) {
- doLog(methodName, "Create vertex " + s);
- e = g.createVertexType(s, DuccVertexBase.VBase.pname());
+// void createVertexType(OrientGraphNoTx g, DbVertex id)
+// {
+// String methodName = "createVertexType";
+// String s = id.pname();
+// OrientVertexType e = g.getVertexType(s);
+// if ( e == null ) {
+// doLog(methodName, "Create vertex " + s);
+// e = g.createVertexType(s, DuccVertexBase.VBase.pname());
+// }
+// }
+
+ void createVertices(OrientGraphNoTx g)
+ {
+ String methodName = "createVertices";
+
+ for (DbVertex v : DbVertex.values()) {
+ String s = v.pname();
+ OrientVertexType vt = g.getVertexType(s);
+
+
+ if ( vt == null ) {
+ if ( v.parent() == null ) {
+ vt = g.createVertexType(s);
+ doLog(methodName, "Create vertex " + s);
+ } else {
+ vt = g.createVertexType(s, v.parent().pname());
+ doLog(methodName, "Create vertex ", s, "subclass from", v.parent().pname());
+ }
+
+ Index[] ndx = v.indices();
+ if ( ndx != null ) {
+ for ( Index i : ndx ) {
+ OProperty p = vt.createProperty(i.propname(), i.type());
+ p.setMandatory(true);
+ doLog(methodName, "Create property", i.propname(), "on", s);
+ }
+ }
+
+ // String sql = "create index " + i.pname() + " on " + s + "(" + i.propname() + " by key) notunique";
+ // g.command(new OCommandSQL(sql)).execute();
+ // doLog(methodName, "(sql)Created index", i.pname(), "on class", s, "for", i.propname());
+ // //g.createKeyIndex(i.propname(), Vertex.class, new Parameter("type", "NOTUNIQUE_HASH_INDEX"));
+ // doLog(methodName, "(java)Created index on class Vertex for", i.propname());
+ // }
+ // }
+
+ }
}
+
+ if ( true ) return;
+ String sql = "create index D.VBASE_0 on VWork(ducc_dbid, ducc_dbcat) NOTUNIQUE";
+ g.command(new OCommandSQL(sql)).execute();
+
+ String sql1 = "create index D.VBASE_ID on VWork(ducc_dbid) NOTUNIQUE";
+ g.command(new OCommandSQL(sql1)).execute();
+
+ String sql2 = "create index D.VBASE_CAT on VWork(ducc_dbcat) NOTUNIQUE";
+ g.command(new OCommandSQL(sql2)).execute();
+
+ String sql3 = "create index D.NODE on VRmNode(ducc_dbnode) NOTUNIQUE";
+ g.command(new OCommandSQL(sql3)).execute();
+
}
+ void createEdges(OrientGraphNoTx g)
+ {
+ String methodName = "createEdges";
+ for (DbEdge e : DbEdge.values()) {
+ String s = e.pname();
+ OrientEdgeType et = g.getEdgeType(s);
+ if ( et == null ) {
+ if ( e.parent() == null ) {
+ et = g.createEdgeType(s);
+ doLog(methodName, "Create edge " + s);
+ } else {
+ et = g.createEdgeType(s, e.parent().pname());
+ doLog(methodName, "Create edge ", s, "subclass from", e.parent().pname());
+ }
+ }
+ }
+ }
+
void createSchema()
{
- String methodName = "createSchema";
OrientGraphNoTx g = factory.getNoTx();
- String base = DuccVertexBase.VBase.pname();
- OrientVertexType e = g.getVertexType(base);
- if ( e == null ) {
- doLog(methodName, "Create base vertex class " + base);
- e = g.createVertexType(base);
- OProperty p = e.createProperty(DbConstants.DUCCID, OType.LONG);
- p.setMandatory(true);
- OProperty p2 = e.createProperty(DbConstants.DUCC_DBCAT, OType.STRING);
- p2.setMandatory(true);
-
- String sql = "create index i_ducc_dbid on " + base + "(" + DbConstants.DUCCID + ") notunique";
- g.command(new OCommandSQL(sql)).execute();
- doLog(methodName, "(sql)Created index i_ducc_dbid on class " + base + " for " + DbConstants.DUCCID);
-
- sql = "create index i_ducc_dbcat on " + base + "(" + DbConstants.DUCC_DBCAT + ") notunique";
- g.command(new OCommandSQL(sql)).execute();
- doLog(methodName, "(sql)Created index i_ducc_dbcat on class " + base + " for " + DbConstants.DUCC_DBCAT);
-
+ try {
+ createVertices(g);
+ createEdges(g);
+ } finally {
+ g.shutdown();
}
+ }
+
+ // void createSchemax()
+ // {
+ // String methodName = "createSchema";
+ // OrientGraphNoTx g = factory.getNoTx();
+
+ // String base = DuccVertexBase.VBase.pname();
+ // OrientVertexType e = g.getVertexType(base);
+ // if ( e == null ) {
+ // doLog(methodName, "Create base vertex class " + base);
+ // e = g.createVertexType(base);
+ // OProperty p = e.createProperty(DbConstants.DUCCID, OType.LONG);
+ // p.setMandatory(true);
+ // OProperty p2 = e.createProperty(DbConstants.DUCC_DBCAT, OType.STRING);
+ // p2.setMandatory(true);
+
+ // String sql = "create index i_ducc_dbid on " + base + "(" + DbConstants.DUCCID + ") notunique";
+ // g.command(new OCommandSQL(sql)).execute();
+ // doLog(methodName, "(sql)Created index i_ducc_dbid on class " + base + " for " + DbConstants.DUCCID);
+
+ // sql = "create index i_ducc_dbcat on " + base + "(" + DbConstants.DUCC_DBCAT + ") notunique";
+ // g.command(new OCommandSQL(sql)).execute();
+ // doLog(methodName, "(sql)Created index i_ducc_dbcat on class " + base + " for " + DbConstants.DUCC_DBCAT);
+
+ // }
- for ( DbVertex o : DbVertex.values() ) {
- createVertexType(g, o);
- }
- for ( DbEdge o : DbEdge.values() ) {
- createEdgeType(g, o);
- }
+ // for ( DbVertex o : DbVertex.values() ) {
+ // createVertexType(g, o);
+ // }
+ // for ( DbEdge o : DbEdge.values() ) {
+ // createEdgeType(g, o);
+ // }
- g.shutdown();
- }
+ // g.shutdown();
+ // }
boolean createPlocalDatabase()
throws Exception
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java?rev=1708149&r1=1708148&r2=1708149&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java Mon Oct 12 16:10:55 2015
@@ -18,20 +18,59 @@
*/
package org.apache.uima.ducc.database;
+import java.io.IOException;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.uima.ducc.common.DuccNode;
+import org.apache.uima.ducc.common.IIdentity;
+import org.apache.uima.ducc.common.Node;
+import org.apache.uima.ducc.common.NodeIdentity;
import org.apache.uima.ducc.common.main.DuccService;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccProperties;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.common.utils.id.IDuccId;
import org.apache.uima.ducc.database.DbConstants.DbCategory;
import org.apache.uima.ducc.database.DbConstants.DbEdge;
import org.apache.uima.ducc.database.DbConstants.DbVertex;
-
+import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
+import org.apache.uima.ducc.transport.cmdline.ICommandLine;
+import org.apache.uima.ducc.transport.event.common.IDuccPerWorkItemStatistics;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessWorkItems;
+import org.apache.uima.ducc.transport.event.common.IDuccReservationMap;
+import org.apache.uima.ducc.transport.event.common.IDuccSchedulingInfo;
+import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo;
+import org.apache.uima.ducc.transport.event.common.IDuccUimaAggregateComponent;
+import org.apache.uima.ducc.transport.event.common.IDuccUimaDeployableConfiguration;
+import org.apache.uima.ducc.transport.event.common.IRationale;
+import org.apache.uima.ducc.transport.event.common.ITimeWindow;
+import org.apache.uima.ducc.transport.event.common.JdReservationBean;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.InstanceCreator;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import com.google.gson.TypeAdapter;
+import com.google.gson.TypeAdapterFactory;
+import com.google.gson.reflect.TypeToken;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonToken;
+import com.google.gson.stream.JsonWriter;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.core.sql.OCommandSQL;
import com.tinkerpop.blueprints.Direction;
@@ -193,6 +232,37 @@ public class DbHandle
return ret;
}
+ public Map<Long, Properties> getPropertiesForTypeSel(DbVertex type, DbCategory dbcat)
+ throws Exception
+ {
+ String methodName = "getPropertiesForType";
+
+ Iterable<Vertex> vs = null;
+ if ( dbcat == DbCategory.Any ) {
+ vs = graphDb.getVertices(type.pname(),
+ new String[] {"@class"},
+ new Object[]{type.pname()});
+
+ } else {
+ vs = select("SELECT FROM " + type.pname() + " WHERE " + DbConstants.DUCC_DBCAT + "='" + dbcat.pname()+"'");
+ }
+
+ Map<Long, Properties> ret = new HashMap<Long, Properties>();
+
+ try {
+ for ( Vertex v : vs ) {
+ OrientVertex ov = (OrientVertex) v;
+ Properties props = vertexToProps(ov);
+ Long did = ov.getProperty(DbConstants.DUCCID);
+ ret.put(did, props);
+ }
+ } catch (Throwable e) {
+ logger.error(methodName, null, "Database access error: ", e);
+ }
+
+ return ret;
+ }
+
/**
* Use this for selecting, it returns a set of stuff
*/
@@ -200,7 +270,10 @@ public class DbHandle
throws Exception
{
String methodName = "select";
+ long now = System.currentTimeMillis();
logger.info(methodName, null, "SQL", sql);
+ logger.info(methodName, null, "Time to select", System.currentTimeMillis() - now);
+
return graphDb.command(new OCommandSQL(sql)).execute();
}
@@ -210,13 +283,19 @@ public class DbHandle
public int execute(String sql)
{
String methodName = "execute";
+ long now = System.currentTimeMillis();
logger.info(methodName, null, "SQL", sql);
+ logger.info(methodName, null, "Time to execute", System.currentTimeMillis() - now);
+
return graphDb.command(new OCommandSQL(sql)).execute();
}
public void commit()
{
+ String methodName = "commit";
+ long now = System.currentTimeMillis();
if ( graphDb != null ) graphDb.commit();
+ logger.info(methodName, null, "Time to commit", System.currentTimeMillis() - now);
}
public void rollback()
@@ -260,7 +339,7 @@ public class DbHandle
* @param obj The json-ified object to save
* @param isHistory 'true' if we save to history, 'false' otherwise
*/
- public Object saveObject(DbVertex type, Long duccid, String obj, DbCategory dbcat)
+ public OrientVertex saveObject(DbVertex type, Long duccid, String obj, DbCategory dbcat)
{
//String methodName = "saveObject";
@@ -350,7 +429,7 @@ public class DbHandle
* @param to a list of the things the source is pointing/linking to
* @param type The type enum for edge, e.g. 'process'
*/
- public void addEdges(Object from, List<Object> to, DbEdge type)
+ public void addEdges(OrientVertex from, List<OrientVertex> to, DbEdge type)
{
//String methodName = "addEdges";
OrientVertex fv = (OrientVertex) from;
@@ -372,7 +451,7 @@ public class DbHandle
* @param a single object to point to
* @param type The type enum for edge, e.g. 'process'
*/
- public void addEdge(Object from, Object to, DbEdge type)
+ public void addEdge(OrientVertex from, OrientVertex to, DbEdge type)
{
//String methodName = "addEdges";
OrientVertex fv = (OrientVertex) from;
@@ -436,52 +515,93 @@ public class DbHandle
// }
+ public void changeCategory(OrientVertex obj, DbCategory to)
+ {
+ String methodName = "changeCategory";
+ Long id = obj.getProperty(DbConstants.DUCCID);
+ String oldcat = obj.getProperty(DbConstants.DUCC_DBCAT);
+ String type = obj.getProperty("@CLASS");
+ logger.info(methodName, null, "Chnage category obj of type", type, "id", id, "from", oldcat, "to", to.pname());
+ obj.setProperty(DbConstants.DUCC_DBCAT, to.pname());
+ }
+
/**
- * Create an object in the db from a properties object.
+ * Update a single property. The object must be unique and must already exist.
*
+ * @param keyid DUCCID or DUCC_DBNODE usually. It's the name of the field with the primary key.
+ * @param key Holds the value of the primary key
+ * @param type The vertex type
+ * @param dbcat The category, e.g. history, or checkpoint, or rmstate
* @param props The properties object to be placed in the db.
- * @param type The type enum for the object, e.g. "Service"
- * @param duccid The numeric id of the object
- * @param isHistory 'True' if it is to be placed in history, 'false' otherwise.
*/
- public Object createPropertiesObject(Properties props, DbVertex type, Long duccid, DbCategory dbcat)
+ public OrientVertex updateProperty(String keyid, Object key, DbVertex type, DbCategory dbcat, String propkey, Object propval)
{
- // Note: caller must insure this is first time for this if he doesn't want a duplicate.
- // by calling thingInDatabase().
-
- //String methodName = "createPropertiesObject";
- String typeName = type.pname();
+ String methodName = "updateProperty";
+ long now = System.currentTimeMillis();
+ Iterable<Vertex> s = graphDb.getVertices(type.pname(), new String[] {"@class", DbConstants.DUCC_DBCAT, keyid}, new Object[]{type.pname(), dbcat.pname(), key});
+ logger.info(methodName, null, "Time to search on " + type.pname() + " where category " + dbcat.pname() + " and " + keyid + " propkey " + key, System.currentTimeMillis() - now);
+
OrientVertex ov = null;
+ int count = 0; // some sanity checking, we're not allowed more than one
+ for (Vertex v : s) {
+ ov = (OrientVertex) v;
+ count++;
+ }
+
+ if ( count > 1 ) {
+ throw new IllegalStateException("Duplocate object in db: Type " + type.pname() + " category " + dbcat.pname() + " key " + key + " propkey " + propkey + " propval " + propval);
+ }
+
+ if ( ov == null ) {
+ throw new IllegalStateException("No object in db: Type " + type.pname() + " category " + dbcat.pname() + " key " + key + " propkey " + propkey + " propval " + propval);
+ }
+
+
+ ov.setProperty(propkey, propval);
+ ov.save();
+ logger.info(methodName, null, "Time to update one property", System.currentTimeMillis() - now);
- // logger.info(methodName, null, duccid, "Create new db record of type", typeName);
- ov = graphDb.addVertex("class:" + typeName, DbConstants.DUCCID, duccid, DbConstants.DUCC_DBCAT, dbcat.pname());
- ov.setProperties(props);
return ov;
}
- /**
- * Use the incoming properties to set the properties on the object of given type and duccid.
- * Rules:
- * 1. If the object does not exist in the db, add it with no properties.
- * 2. If the property exists in both, update the value.
- * 3. If the property exists only in db object, delete from the db object.
- * 4. If the property exists only in input, add to db object.
- * 5. Caller must commit, allowing for multiple things in a transaction
- * @param props The propertiess to sync with
- * @param type The type of object to update (e.g. Service, ServiceMeta, Job etc)
- * @param duccid the duccid of the object
- * @param isHistory 'True' if the object is to be placed in history, 'false' otherwise
- */
- public void syncProperties(Properties props, DbVertex type, Long duccid, DbCategory dbcat)
- throws Exception
+
+ public OrientVertex updateProperty(Object dbid, String propkey, Object propval)
{
- //String methodName = "syncProperties";
+ String methodName = "updateProperty";
+ long now = System.currentTimeMillis();
+
+ OrientVertex ov = graphDb.getVertex(dbid);
+ logger.info(methodName, null, "Time to search on " + dbid.toString(), System.currentTimeMillis() - now);
+
+ if ( ov == null ) {
+ throw new IllegalStateException("No object in db: Id " + dbid.toString());
+ }
+
+ ov.setProperty(propkey, propval);
+ ov.save();
+ logger.info(methodName, null, "Time to update one property for", dbid.toString(), System.currentTimeMillis() - now);
+
+ return ov;
+ }
+
+ /**
+ * Update the properties on an object in the db with the incoming properties. The object must exist.
+ * There is no attempt to synchronize values - the new props either replace, or enrich the object.
+ *
+ * @param keyid DUCCID or DUCC_DBNODE usually. It's the name of the field with the primary key.
+ * @param key Holds the value of the primary key
+ * @param type The vertex type
+ * @param dbcat The category, e.g. history, or checkpoint, or rmstate
+ * @param props A list of (k,v) pairs which will replace their counterparts in the object.
+ */
+ public OrientVertex updateProperties(String keyid, Object key, DbVertex type, DbCategory dbcat, Object... props)
+ {
+ String methodName = "updateProperties";
+ long now = System.currentTimeMillis();
+ Iterable<Vertex> s = graphDb.getVertices(type.pname(), new String[] {"@class", DbConstants.DUCC_DBCAT, keyid}, new Object[]{type.pname(), dbcat.pname(), key});
+ logger.info(methodName, null, "Time to search on " + type.pname() + " where category " + dbcat.pname() + " and " + keyid + " propkey " + key, System.currentTimeMillis() - now);
- // The assumption is that only one object of the given DbVertex.type and duccid is allowed in the
- // database.
- Iterable<Vertex> s = graphDb.getVertices(type.pname(), new String[] {"@class", DbConstants.DUCCID}, new Object[]{type.pname(), duccid});
-
OrientVertex ov = null;
int count = 0; // some sanity checking, we're not allowed more than one
for (Vertex v : s) {
@@ -490,25 +610,391 @@ public class DbHandle
}
if ( count > 1 ) {
- throw new IllegalStateException("Multiple database records for " + type + "." + duccid);
- }
+ throw new IllegalStateException("updateProperties: Duplocate object in db: Type " + type.pname() + " category " + dbcat.pname() + " key " + keyid + " propkey " + key);
+ }
+
+ if ( ov == null ) {
+ throw new IllegalStateException("updateProperties: No object in db: Type " + type.pname() + " category " + dbcat.pname() + " key " + keyid + " propkey " + key);
+ }
+ logger.info(methodName, null, "Time to find a vertex", System.currentTimeMillis() - now);
+
+ long now2 = System.currentTimeMillis();
+ ov.setProperties(props);
+ ov.save();
+ logger.info(methodName, null, "Time to modify a vertex", System.currentTimeMillis() - now2);
+
+ logger.info(methodName, null, "Time to update several properties", System.currentTimeMillis() - now);
+
+ return ov;
+ }
+
+ public OrientVertex updateProperties(Object dbid, Object... props)
+ {
+ String methodName = "updateProperties";
+ long now = System.currentTimeMillis();
+
+
+ OrientVertex ov = graphDb.getVertex(dbid);
- if ( count == 0 ) {
- throw new IllegalStateException("No record found to update for " + type + "." + duccid);
+ if ( ov == null ) {
+ throw new IllegalStateException("updateProperties: No object in db: id " + dbid.toString());
}
+ logger.info(methodName, null, "Time to find a vertex", dbid.toString(), System.currentTimeMillis() - now);
+
+ long now2 = System.currentTimeMillis();
+ ov.setProperties(props);
+ ov.save();
+ logger.info(methodName, null, "Time to modify vertex", dbid.toString(), System.currentTimeMillis() - now2);
+
+ logger.info(methodName, null, "Total time to update vertex", dbid.toString(), System.currentTimeMillis() - now);
+
+ return ov;
+ }
+
+ // /**
+ // * Synchronize the properties in an object with the incoming properties. THe object must be unique
+ // * and already exist.
+ // *
+ // * @param keyid DUCCID or DUCC_DBNODE usually. It's the name of the field with the primary key.
+ // * @param key Holds the value of the primary key
+ // * @param type The vertex type
+ // * @param dbcat The category, e.g. history, or checkpoint, or rmstate
+ // * @param props The properties object to be placed in the db.
+ // */
+ // public OrientVertex synchronizeProperties(String keyid, Object key, DbVertex type, DbCategory dbcat, Properties props)
+ // {
+ // String methodName = "synchronizeProperties";
+ // long now = System.currentTimeMillis();
- //logger.info(methodName, null, duccid, "Update record of type", type);
- Set<String> keys = ov.getPropertyKeys();
- for (String k : keys) { // (clear a property according to rule 3 above)
- if ( ! k.equals(DbConstants.DUCCID) && !props.containsKey(k) ) {
- ov.removeProperty(k);
+ // Iterable<Vertex> s = graphDb.getVertices(type.pname(), new String[] {"@class", DbConstants.DUCC_DBCAT, keyid}, new Object[]{type.pname(), dbcat.pname(), key});
+ // logger.info(methodName, null, "Time to search " + type.pname() + " category " + dbcat.pname() + " key " + key, System.currentTimeMillis() - now);
+
+
+ // long now2 = System.currentTimeMillis();
+
+ // OrientVertex ov = null;
+ // int count = 0; // some sanity checking, we're not allowed more than one
+ // for (Vertex v : s) {
+ // ov = (OrientVertex) v;
+ // count++;
+ // }
+
+ // if ( count > 1 ) {
+ // throw new IllegalStateException("updateProperties: Duplocate object in db: Type " + type.pname() + " category " + dbcat.pname() + " key " + key + " propkey " + key);
+ // }
+
+ // if ( ov == null ) {
+ // throw new IllegalStateException("updateProperties: No object in db: Type " + type.pname() + " category " + dbcat.pname() + " key " + key + " propkey " + key);
+ // }
+ // logger.info(methodName, null, "Time to itarate " + type.pname() + " category " + dbcat.pname() + " key " + key, System.currentTimeMillis() - now2);
+ // now2 = System.currentTimeMillis();
+
+ // logger.info(methodName, null, "Update db record of type", type.pname(), "category", dbcat.pname(), "key", key);
+ // Set<String> keys = ov.getPropertyKeys();
+ // for (String k : keys) {
+ // if ( k.equals(DbConstants.DUCCID) ) continue; // must bypass schema things
+ // if ( k.equals(DbConstants.DUCC_DBCAT) ) continue;
+ // if ( k.equals(DbConstants.DUCC_DBNODE) ) continue;
+
+ // Object val1 = ov.getProperty(k);
+ // Object val2 = props.get(k);
+
+ // if ( val2 == null ) { // the property is removed
+ // logger.info(methodName, null, "Removed property", k);
+ // ov.removeProperty(k);
+ // continue;
+ // }
+
+ // if ( !val2.equals(val1) ) { // replace/add a property. val2 is known not null
+ // logger.info(methodName, null, "Replaced/added property", k, "value", val1, "with", val2);
+ // ov.setProperty(k, val2);
+ // continue;
+ // }
+ // }
+
+ // ov.setProperty(keyid, key);
+ // ov.save();
+ // logger.info(methodName, null, "Time to update " + type.pname() + " category " + dbcat.pname() + " key " + key, System.currentTimeMillis() - now2);
+
+ // logger.info(methodName, null, "Time to synchronize properties", System.currentTimeMillis() - now);
+
+ // return ov;
+ // }
+
+ // public OrientVertex synchronizeProperties(Object dbid, Properties props)
+ // {
+ // String methodName = "synchronizeProperties";
+ // long now = System.currentTimeMillis();
+
+
+
+ // long now2 = System.currentTimeMillis();
+
+ // OrientVertex ov = graphDb.getVertex(dbid);
+ // if ( ov == null ) {
+ // throw new IllegalStateException("updateProperties: No object in db. Id " + dbid);
+ // }
+ // logger.info(methodName, null, "Time to search for ", dbid, System.currentTimeMillis() - now);
+
+ // now2 = System.currentTimeMillis();
+ // logger.info(methodName, null, "Update db record ", dbid);
+ // Set<String> keys = ov.getPropertyKeys();
+ // for (String k : keys) {
+ // if ( k.equals(DbConstants.DUCCID) ) continue; // must bypass schema things
+ // if ( k.equals(DbConstants.DUCC_DBCAT) ) continue;
+ // if ( k.equals(DbConstants.DUCC_DBNODE) ) continue;
+
+ // if ( k.equals("svc_dbid") ) continue;
+ // if ( k.equals("meta_dbid") ) continue;
+
+ // Object val1 = ov.getProperty(k);
+ // Object val2 = props.get(k);
+
+ // if ( val2 == null ) { // the property is removed
+ // logger.info(methodName, null, "Removed property", k);
+ // ov.removeProperty(k);
+ // continue;
+ // }
+
+ // if ( !val2.equals(val1) ) { // replace/add a property. val2 is known not null
+ // logger.info(methodName, null, "Replaced/added property", k, "value", val1, "with", val2);
+ // ov.setProperty(k, val2);
+ // continue;
+ // }
+ // }
+
+ // ov.save();
+ // logger.info(methodName, null, "Time to update record", dbid, System.currentTimeMillis() - now2);
+ // logger.info(methodName, null, "Total time to synchronize record", dbid, System.currentTimeMillis() - now);
+
+ // return ov;
+ // }
+
+ /**
+ * Create an object in the db from a properties object. The caller must do the the checking to insure
+ * the object already exists (or not, e.g. for a db loader).
+ *
+ * @param props The properties object to be placed in the db.
+ * @param type The type enum for the object, e.g. "Service"
+ * @param duccid The numeric id of the object
+ * @param isHistory 'True' if it is to be placed in history, 'false' otherwise.
+ */
+ public OrientVertex createProperties(String keyid, Object key, DbVertex type, DbCategory dbcat, Properties props)
+ {
+ String methodName = "createPropertiesObject";
+ String typeName = type.pname();
+ OrientVertex ov = null;
+
+ logger.info(methodName, null, "Create new properties object of type", type.pname(), "category", dbcat.pname(), "key", key);
+ ov = graphDb.addVertex("class:" + typeName, keyid, key, DbConstants.DUCC_DBCAT, dbcat.pname());
+ ov.setProperties(props);
+ return ov;
+ }
+
+ static Gson gson = null;
+ static Gson mkGsonForJob()
+ {
+ synchronized(DbHandle.class) {
+ if ( gson != null ) return gson;
+
+ // We need to define Instance creators and such so we do it in a common place
+ GsonBuilder gb = new GsonBuilder();
+
+ GenericInterfaceAdapter customAdapter = new GenericInterfaceAdapter();
+ gb.serializeSpecialFloatingPointValues().setPrettyPrinting();
+ gb.enableComplexMapKeySerialization();
+
+ gb.registerTypeAdapter(Node.class, new NodeInstanceCreator());
+ gb.registerTypeAdapter(NodeIdentity.class, new NodeIdentityCreator());
+
+ //gb.registerTypeAdapter(IIdentity.class, new IdentityInstanceCreator());
+ gb.registerTypeAdapter(IIdentity.class, customAdapter);
+
+ gb.registerTypeAdapter(IDuccId.class, customAdapter);
+ gb.registerTypeAdapter(ICommandLine.class, customAdapter);
+ gb.registerTypeAdapter(ITimeWindow.class, customAdapter);
+ gb.registerTypeAdapter(IDuccProcessWorkItems.class, customAdapter);
+ gb.registerTypeAdapter(IDuccUimaAggregateComponent.class, customAdapter);
+ gb.registerTypeAdapter(IUimaPipelineAEComponent.class, customAdapter);
+ gb.registerTypeAdapter(IRationale.class, customAdapter);
+ gb.registerTypeAdapter(IDuccUimaDeployableConfiguration.class, customAdapter);
+ gb.registerTypeAdapter(IDuccStandardInfo.class, customAdapter);
+ gb.registerTypeAdapter(IDuccSchedulingInfo.class, customAdapter);
+ gb.registerTypeAdapter(IDuccPerWorkItemStatistics.class, customAdapter);
+ gb.registerTypeAdapter(IDuccReservationMap.class, customAdapter);
+ gb.registerTypeAdapter(JdReservationBean.class, customAdapter);
+
+ //ConcurrentHashMap<DuccId, Long> x = new ConcurrentHashMap<DuccId, Long>();
+ //gb.registerTypeAdapter(x.getClass(), new MapAdaptor());
+
+ //gb.registerTypeAdapterFactory(new DuccTypeFactory());
+ //Object obj = new ArrayList<IJdReservation>();
+ //gb.registerTypeAdapter(obj.getClass(), customAdapter);
+ Gson g = gb.create();
+ return g;
+ }
+ }
+
+ // ----------------------------------------------------------------------------------------------------
+ // Instance creators and adaptors for GSON
+ // ----------------------------------------------------------------------------------------------------
+
+ // We need these for the DuccNode and NodeIdentity because they don't have no-arg
+ // Constructors.
+ //
+ // @TODO after merge, consult with Jerry about adding in those constructors
+ private static class NodeInstanceCreator implements InstanceCreator<Node> {
+ public Node createInstance(Type type) {
+ // System.out.println("DuccNode");
+ return new DuccNode(null, null, false);
+ }
+ }
+
+ private static class NodeIdentityCreator implements InstanceCreator<NodeIdentity> {
+ public NodeIdentity createInstance(Type type) {
+ // System.out.println("DuccNodeIdentity");
+ try { return new NodeIdentity(null, null); } catch ( Exception e ) {}
+ return null;
+ }
+ }
+
+ /**
+ * JSON helper for our complex objects. Gson doesn't save type information in the json so
+ * it doesn't know how to construct things declared as interfaces.
+ *
+ * This class is a Gson adapter that saves the actual object type in the json on serialization,
+ * and uses that information on deserialization to construct the right thing.
+ */
+ private static class GenericInterfaceAdapter
+ implements
+ JsonSerializer<Object>,
+ JsonDeserializer<Object>
+ {
+
+ private static final String DUCC_META_CLASS = "DUCC_META_CLASS";
+
+ @Override
+ public Object deserialize(JsonElement jsonElement,
+ Type type,
+ JsonDeserializationContext jsonDeserializationContext)
+ throws JsonParseException
+ {
+ // Reconstitute the "right" class based on the actual class it came from as
+ // found in metadata
+ JsonObject obj = jsonElement.getAsJsonObject();
+ JsonElement clElem= obj.get(DUCC_META_CLASS);
+
+ if ( clElem== null ) {
+ throw new IllegalStateException("Cannot determine concrete class for " + type + ". Must register explicit type adapter for it.");
}
+ String clName = clElem.getAsString();
+
+ //System.out.println("----- elem: " + clName + " clElem: " + obj);
+ try {
+ Class<?> clz = Class.forName(clName);
+ return jsonDeserializationContext.deserialize(jsonElement, clz);
+ } catch (ClassNotFoundException e) {
+ throw new JsonParseException(e);
+ }
+ }
+
+ @Override
+ public JsonElement serialize(Object object,
+ Type type,
+ JsonSerializationContext jsonSerializationContext)
+ {
+ // Add the mete element indicating what kind of concrete class is this came from
+ //String n = object.getClass().getCanonicalName();
+ //System.out.println("**** Serialize object A " + n + " of type " + type);
+ //if ( n.contains("Concurrent") ) {
+ // int stop = 1;
+ // stop++;
+ //}
+
+ JsonElement ele = jsonSerializationContext.serialize(object, object.getClass());
+ //System.out.println("**** Serialize object B " + object.getClass().getCanonicalName() + " of type " + type + " : ele " + ele);
+ ele.getAsJsonObject().addProperty(DUCC_META_CLASS, object.getClass().getCanonicalName());
+ return ele;
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private class DuccTypeFactory
+ implements TypeAdapterFactory
+ {
+
+ public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> typeToken)
+ {
+ //System.out.println("TYPETOKEN: " + typeToken + " raw type: " + typeToken.getRawType().getName());
+ Class<?> cl = typeToken.getRawType();
+ //System.out.println(" Canonical name: " + cl.getCanonicalName());
+ Type type = typeToken.getType();
+ if ( typeToken.getRawType() != ConcurrentHashMap.class ) {
+ //System.out.println("Skipping type " + typeToken);
+ return null;
+ }
+
+ if ( type instanceof ParameterizedType ) {
+
+ ParameterizedType pt = (ParameterizedType) type;
+ Type[] types = pt.getActualTypeArguments();
+ //for ( Type tt : types ) {
+ // System.out.println(" TYPE ARGUMENTS: " + tt);
+ //}
+ Type tt = types[0];
+ Class<?> cll = (Class<?>) tt;
+
+ }
+ return null;
}
- ov.setProperties(props); // handles both rules 2 and 4
- ov.setProperty(DbConstants.DUCC_DBCAT, dbcat.pname());
- //graphDb.getRawGraph().save(ov.getRecord());
- ov.save();
}
+ @SuppressWarnings("unused")
+ private class MapAdaptor
+ extends TypeAdapter<ConcurrentHashMap<DuccId, Long>>
+ {
+
+ public void write(JsonWriter out, ConcurrentHashMap<DuccId, Long> map) throws IOException {
+ System.out.println("***************** Writing");
+ if (map == null) {
+ out.nullValue();
+ return;
+ }
+
+ out.beginArray();
+ for (DuccId k : map.keySet() ) {
+ out.beginObject();
+ out.value(k.getFriendly());
+ out.value(k.getUnique());
+ out.value(map.get(k));
+ out.endObject();
+ }
+ out.endArray();
+ }
+
+ public ConcurrentHashMap<DuccId, Long> read(JsonReader in) throws IOException {
+ System.out.println("***************** reading");
+ if (in.peek() == JsonToken.NULL) {
+ in.nextNull();
+ return null;
+ }
+
+ ConcurrentHashMap<DuccId, Long> ret = new ConcurrentHashMap<DuccId, Long>();
+ in.beginArray();
+ while (in.hasNext()) {
+ in.beginObject();
+ Long friendly = in.nextLong();
+ String unique = in.nextString();
+
+ Long val = in.nextLong();
+ in.endObject();
+ DuccId id = new DuccId(friendly);
+ id.setUUID(UUID.fromString(unique));
+ ret.put(id, val);
+ }
+ in.endArray();
+ return ret;
+ }
+ }
}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbListLoader.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbListLoader.java?rev=1708149&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbListLoader.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbListLoader.java Mon Oct 12 16:10:55 2015
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.database;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Field;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkService;
+
+/**
+ * Toy orientdb loader to load a historydb from ducc history
+ */
+
+public class DbListLoader
+{
+ DuccLogger logger = DuccLogger.getLogger(DbLoader.class, "DBLOAD");
+ String DUCC_HOME;
+
+ HistoryManagerDb hmd = null;
+ StateServicesDb ssd = null;
+
+ // String history_url = "remote:localhost/DuccHistory";
+ // String state_url = "plocal:/home/challngr/ducc_runtime_db/database/databases/DuccHistoryT";
+ String state_url = null;
+ String input_list = null;
+ int nthreads = 20;
+ AtomicInteger counter = new AtomicInteger(0);
+
+
+ public DbListLoader(String from, String to)
+ throws Exception
+ {
+ //String methodName = "<ctr>";
+ DUCC_HOME = System.getProperty("DUCC_HOME");
+ if ( DUCC_HOME == null ) {
+ System.out.println("System proprety -DDUCC_HOME must be set.");
+ System.exit(1);
+ }
+
+ File f = new File(from);
+ if ( ! f.exists() ) {
+ System.out.println("Input file does not exist or cannot be read.");
+ System.exit(1);
+ }
+ input_list = from;
+
+ f = new File(to);
+ if ( ! f.isDirectory() ) {
+ System.out.println("'to' must be a directory");
+ System.exit(1);
+ }
+
+ String databasedir = to + "/database/databases";
+ //String databasename = databasedir + "/DuccState";
+
+ state_url = "plocal:" + databasedir + "/DuccState";
+ state_url = "remote:bluej538/DuccState";
+ System.setProperty("ducc.state.database.url", state_url);
+ }
+
+ void closeStream(InputStream in)
+ {
+ try { in.close(); } catch(Exception e) {}
+ }
+
+ void loadJob(String f)
+ {
+ String methodName = "loadJob";
+ IDuccWorkJob job = null;
+
+ FileInputStream fis = null;
+ ObjectInputStream in = null;
+
+ try {
+ long now = System.currentTimeMillis();
+ fis = new FileInputStream(f);
+ in = new ObjectInputStream(fis);
+ job = (IDuccWorkJob) in.readObject();
+ logger.info(methodName, job.getDuccId(), "Time to read job:", System.currentTimeMillis() - now);
+ hmd.saveJobUnsafe(job);
+ } catch(Exception e) {
+ logger.info(methodName, null, e);
+ } finally {
+ closeStream(in);
+ closeStream(fis);
+ }
+ }
+
+ void loadReservation(String f)
+ {
+ String methodName = "loadReservation";
+ IDuccWorkReservation res = null;
+ FileInputStream fis = null;
+ ObjectInputStream in = null;
+
+ try {
+ long now = System.currentTimeMillis();
+ fis = new FileInputStream(f);
+ in = new ObjectInputStream(fis);
+ res = (IDuccWorkReservation) in.readObject();
+ logger.info(methodName, res.getDuccId(), "Time to read reservation:", System.currentTimeMillis() - now);
+ hmd.saveReservationUnsafe(res);
+ } catch(Exception e) {
+ logger.info(methodName, null, e);
+ } finally {
+ closeStream(in);
+ closeStream(fis);
+ counter.getAndDecrement();
+ }
+ }
+
+ void loadService(String f)
+ {
+ String methodName = "loadService";
+ IDuccWorkService svc = null;
+
+ FileInputStream fis = null;
+ ObjectInputStream in = null;
+
+ try {
+ long now = System.currentTimeMillis();
+ fis = new FileInputStream(f);
+ in = new ObjectInputStream(fis);
+ svc = (IDuccWorkService) in.readObject();
+ logger.info(methodName, svc.getDuccId(), "Time to read service:", System.currentTimeMillis() - now);
+ hmd.saveServiceUnsafe(svc);
+ } catch(Exception e) {
+ logger.info(methodName, null, "Error reading or saving service:", f);
+ logger.info(methodName, null, e);
+ } finally {
+ closeStream(in);
+ closeStream(fis);
+ }
+ }
+
+ void loadCheckpoint(String ckpt)
+ throws Exception
+ {
+ String methodName = "loadCheckpoint";
+
+ File f = new File(ckpt);
+ if ( ! f.exists() ) {
+ logger.info(methodName, null, "No checkpoint file to convert.");
+ return;
+ }
+
+ //
+ // A note - the Checkpointable object might be in the "wrong" package and can't be
+ // cast properly. When putting it into database we have to pick out the
+ // fields anyway. So here we use introspection to get the fields and
+ // create the database entries.
+ //
+ FileInputStream fis = null;
+ ObjectInputStream in = null;
+ try {
+ fis = new FileInputStream(ckpt);
+ in = new ObjectInputStream(fis);
+
+ Object xobj = (Object) in.readObject();
+ Class<?> cl = xobj.getClass();
+ Field p2jfield = cl.getDeclaredField("processToJobMap");
+ p2jfield.setAccessible(true);
+ ConcurrentHashMap<DuccId, DuccId> p2jmap = (ConcurrentHashMap<DuccId, DuccId>) p2jfield.get(xobj);
+
+ Field wmField = cl.getDeclaredField("workMap");
+ wmField.setAccessible(true);
+ DuccWorkMap workMap = (DuccWorkMap) wmField.get(xobj);
+
+ hmd.checkpoint(workMap, p2jmap);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } finally {
+ fis.close();
+ in.close();
+ }
+
+ }
+
+
+ void run()
+ throws Exception
+ {
+ //String methodName = "run";
+ //long now = System.currentTimeMillis();
+
+ hmd = new HistoryManagerDb(logger);
+
+ BufferedReader br = new BufferedReader(new FileReader(input_list));
+ String f = null;
+ while ( (f = br.readLine()) != null ) {
+ if ( f.endsWith(".dwj") ) { loadJob(f); continue; }
+ if ( f.endsWith(".dwr") ) { loadReservation(f); continue; }
+ if ( f.endsWith(".dws") ) { loadService(f); continue; }
+ if ( f.endsWith(".ckpt") ) { loadCheckpoint(f); continue; }
+ }
+ br.close();
+ }
+
+ public static void main(String[] args)
+ {
+ if ( args.length != 2 ) {
+ System.out.println("USage: DbLoader from to");
+ System.out.println("");
+ System.out.println("Where:");
+ System.out.println(" from");
+ System.out.println(" is a file with the fully-qualified names of files you want loaded into he db.");
+ System.out.println(" to");
+ System.out.println(" is the DUCC_HOME contining the new database");
+ System.out.println("");
+ System.out.println("THe database must be started and initialized with the correct schema.");
+ System.exit(1);
+ }
+
+
+ DbListLoader dbl = null;
+ try {
+ dbl = new DbListLoader(args[0], args[1]);
+ dbl.run();
+ } catch ( Exception e ) {
+ e.printStackTrace();
+ }
+ }
+
+}