You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by bf...@apache.org on 2012/03/18 01:01:58 UTC
svn commit: r1302040 - in /oodt/trunk/pge/src:
main/java/org/apache/oodt/cas/pge/ main/java/org/apache/oodt/cas/pge/config/
main/java/org/apache/oodt/cas/pge/logging/
main/java/org/apache/oodt/cas/pge/metadata/ test/org/apache/oodt/cas/pge/
test/org/ap...
Author: bfoster
Date: Sun Mar 18 00:01:57 2012
New Revision: 1302040
URL: http://svn.apache.org/viewvc?rev=1302040&view=rev
Log:
Improved CAS-PGE logging, product ingest fail CAS-PGE will fail, and PgeConfigBuilder configurable via PgeTaskMetKeys
----------------
OODT-418
OODT-419
OODT-420
Added:
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/config/
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/config/MockPgeConfigBuilder.java (with props)
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/writers/
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/writers/MockSciPgeConfigFileWriter.java (with props)
Modified:
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/config/XmlFilePgeConfigBuilder.java
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogHandler.java
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogRecord.java
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetKeys.java
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/logging/TestPgeLogHandler.java
Modified: oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java
URL: http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java?rev=1302040&r1=1302039&r2=1302040&view=diff
==============================================================================
--- oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java (original)
+++ oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java Sun Mar 18 00:01:57 2012
@@ -26,6 +26,7 @@ import static org.apache.oodt.cas.pge.me
import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.INGEST_FILE_MANAGER_URL;
import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.MET_FILE_EXT;
import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.NAME;
+import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.PGE_CONFIG_BUILDER;
import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.PGE_RUNTIME;
import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.PROPERTY_ADDERS;
import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.REQUIRED_METADATA;
@@ -36,12 +37,12 @@ import static org.apache.oodt.cas.pge.me
//JDK imports
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.nio.CharBuffer;
+import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.logging.Handler;
@@ -49,8 +50,10 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
-//OODT imports
+//Apache imports
import org.apache.commons.lang.Validate;
+
+//OODT imports
import org.apache.oodt.cas.crawl.ProductCrawler;
import org.apache.oodt.cas.crawl.StdProductCrawler;
import org.apache.oodt.cas.crawl.status.IngestStatus;
@@ -60,6 +63,7 @@ import org.apache.oodt.cas.metadata.util
import org.apache.oodt.cas.pge.config.DynamicConfigFile;
import org.apache.oodt.cas.pge.config.OutputDir;
import org.apache.oodt.cas.pge.config.PgeConfig;
+import org.apache.oodt.cas.pge.config.PgeConfigBuilder;
import org.apache.oodt.cas.pge.config.RegExprOutputFiles;
import org.apache.oodt.cas.pge.config.RenamingConv;
import org.apache.oodt.cas.pge.config.XmlFilePgeConfigBuilder;
@@ -97,74 +101,142 @@ public class PGETaskInstance implements
protected String workflowInstId;
protected PgeMetadata pgeMetadata;
protected PgeConfig pgeConfig;
+ protected Handler logHandler;
protected PGETaskInstance() {}
+ public void run(Metadata metadata, WorkflowTaskConfiguration config)
+ throws WorkflowTaskInstanceException {
+ try {
+ // Initialize CAS-PGE.
+ pgeMetadata = createPgeMetadata(metadata, config);
+ pgeConfig = createPgeConfig();
+ runPropertyAdders();
+ wm = createWorkflowManagerClient();
+ workflowInstId = getWorkflowInstanceId();
+
+ // Initialize Logger.
+ initializePgeLogger();
+
+ // Setup the PGE.
+ createExeDir();
+ createOuputDirsIfRequested();
+ updateStatus(CONF_FILE_BUILD.getWorkflowStatusName());
+ createSciPgeConfigFiles();
+
+ // Run the PGE and process its data.
+ runPge();
+
+ // Generate product metadata.
+ generateMetadataForProducts();
+
+ // Ingest products.
+ runIngestCrawler(createProductCrawler());
+
+ // Commit dynamic metadata.
+ updateDynamicMetadata();
+ } catch (Exception e) {
+ log(Level.SEVERE, "PGETask FAILED!!! : " + e.getMessage(), e);
+ throw new WorkflowTaskInstanceException("PGETask FAILED!!! : "
+ + e.getMessage(), e);
+ } finally {
+ try { closePgeLogger(); } catch (Exception e) {}
+ }
+ }
+
protected void updateStatus(String status) throws Exception {
log(Level.INFO, "Updating status to workflow as [" + status + "]");
wm.updateWorkflowInstanceStatus(workflowInstId, status);
}
- protected Handler initializePgeLogger() throws Exception {
+ protected void initializePgeLogger() throws Exception {
File logDir = new File(pgeConfig.getExeDir(), "logs");
- if (!logDir.mkdirs()) {
+ if (!(logDir.exists() || logDir.mkdirs())) {
throw new Exception("mkdirs for logs directory return false");
}
- Handler handler = new PgeLogHandler(pgeMetadata.getMetadata(NAME),
+ logHandler = new PgeLogHandler(workflowInstId,
new FileOutputStream(new File(logDir, createLogFileName())));
- Logger.getLogger(PGETaskInstance.class.getName()).addHandler(handler);
- return handler;
+ Logger.getLogger(PGETaskInstance.class.getName()).addHandler(logHandler);
}
- protected String createLogFileName() {
+ protected String createLogFileName() throws Exception {
return pgeMetadata.getMetadata(NAME) + "." + System.currentTimeMillis()
+ ".log";
}
- protected void closePgeLogger(Handler handler) {
- handler.close();
- Logger.getLogger(PGETaskInstance.class.getName()).removeHandler(handler);
+ protected void closePgeLogger() throws Exception {
+ logHandler.close();
+ Logger.getLogger(PGETaskInstance.class.getName()).removeHandler(logHandler);
}
protected void log(Level level, String message) {
Logger.getLogger(PGETaskInstance.class.getName()).log(
- new PgeLogRecord(pgeMetadata.getMetadata(NAME), level, message));
+ new PgeLogRecord(workflowInstId, level, message));
}
protected void log(Level level, String message, Throwable t) {
Logger.getLogger(PGETaskInstance.class.getName()).log(
- new PgeLogRecord(pgeMetadata.getMetadata(NAME), level, message, t));
+ new PgeLogRecord(workflowInstId, level, message, t));
}
protected PgeMetadata createPgeMetadata(Metadata dynMetadata,
WorkflowTaskConfiguration config) throws Exception {
+ log(Level.INFO,
+ "Converting workflow configuration to static metadata...");
Metadata staticMetadata = new Metadata();
for (Object objKey : config.getProperties().keySet()) {
String key = (String) objKey;
PgeTaskMetKeys metKey = PgeTaskMetKeys.getByName(key);
if (metKey != null && metKey.isVector()) {
- staticMetadata.addMetadata(key,
- Lists.newArrayList(Splitter.on(",").trimResults()
- .omitEmptyStrings()
- .split(config.getProperty(key))));
+ List<String> values = Lists.newArrayList(
+ Splitter.on(",").trimResults()
+ .omitEmptyStrings()
+ .split(config.getProperty(key)));
+ log(Level.FINEST, "Adding static metadata: key = [" + key
+ + "] value = " + values);
+ staticMetadata.addMetadata(key, values);
} else {
- staticMetadata.addMetadata(key, config.getProperty(key));
+ String value = config.getProperty(key);
+ log(Level.FINEST, "Adding static metadata: key = [" + key
+ + "] value = [" + value + "]");
+ staticMetadata.addMetadata(key, value);
}
}
+ log(Level.INFO, "Loading workflow context metadata...");
+ for (String key : dynMetadata.getAllKeys()) {
+ log(Level.FINEST,
+ "Adding dynamic metadata: key = [" + key + "] value = "
+ + dynMetadata.getAllMetadata(key));
+ }
return new PgeMetadata(staticMetadata, dynMetadata);
}
protected PgeConfig createPgeConfig() throws Exception {
- return new XmlFilePgeConfigBuilder().build(pgeMetadata);
+ log(Level.INFO, "Create PgeConfig...");
+ String pgeConfigBuilderClass = pgeMetadata
+ .getMetadata(PGE_CONFIG_BUILDER);
+ if (pgeConfigBuilderClass != null) {
+ log(Level.INFO, "Using PgeConfigBuilder: " + pgeConfigBuilderClass);
+ return ((PgeConfigBuilder) Class.forName(pgeConfigBuilderClass)
+ .newInstance()).build(pgeMetadata);
+ } else {
+ log(Level.INFO, "Using default PgeConfigBuilder: "
+ + XmlFilePgeConfigBuilder.class.getCanonicalName());
+ return new XmlFilePgeConfigBuilder().build(pgeMetadata);
+ }
}
protected void runPropertyAdders() throws Exception {
try {
- List<String> propertyAdders = pgeMetadata.getAllMetadata(PROPERTY_ADDERS);
+ log(Level.INFO, "Loading/Running property adders...");
+ List<String> propertyAdders = pgeMetadata
+ .getAllMetadata(PROPERTY_ADDERS);
if (propertyAdders != null) {
for (String propertyAdder : propertyAdders) {
runPropertyAdder(loadPropertyAdder(propertyAdder));
}
+ } else {
+ log(Level.INFO, "No property adders specified");
}
} catch (Exception e) {
throw new Exception("Failed to instanciate/run Property Adders : "
@@ -174,25 +246,30 @@ public class PGETaskInstance implements
protected ConfigFilePropertyAdder loadPropertyAdder(
String propertyAdderClasspath) throws Exception {
+ log(Level.FINE, "Loading property adder: " + propertyAdderClasspath);
return (ConfigFilePropertyAdder) Class.forName(propertyAdderClasspath)
.newInstance();
}
protected void runPropertyAdder(ConfigFilePropertyAdder propAdder)
throws Exception {
+ log(Level.INFO, "Running property adder: "
+ + propAdder.getClass().getCanonicalName());
propAdder.addConfigProperties(pgeMetadata,
pgeConfig.getPropertyAdderCustomArgs());
}
protected XmlRpcWorkflowManagerClient createWorkflowManagerClient()
throws Exception {
- String urlString = pgeMetadata.getMetadata(WORKFLOW_MANAGER_URL);
- Validate.notNull(urlString, "Must specify " + WORKFLOW_MANAGER_URL);
- return new XmlRpcWorkflowManagerClient(new URL(urlString));
+ String url = pgeMetadata.getMetadata(WORKFLOW_MANAGER_URL);
+ log(Level.INFO, "Creating WorkflowManager client for url [" + url + "]");
+ Validate.notNull(url, "Must specify " + WORKFLOW_MANAGER_URL);
+ return new XmlRpcWorkflowManagerClient(new URL(url));
}
protected String getWorkflowInstanceId() throws Exception {
String instanceId = pgeMetadata.getMetadata(CoreMetKeys.WORKFLOW_INST_ID);
+ log(Level.INFO, "Workflow instanceId is [" + instanceId + "]");
Validate.notNull(instanceId, "Must specify "
+ CoreMetKeys.WORKFLOW_INST_ID);
return instanceId;
@@ -201,7 +278,8 @@ public class PGETaskInstance implements
protected void createExeDir() throws Exception {
log(Level.INFO, "Creating PGE execution working directory: ["
+ pgeConfig.getExeDir() + "]");
- if (!new File(pgeConfig.getExeDir()).mkdirs()) {
+ File executionDir = new File(pgeConfig.getExeDir());
+ if (!(executionDir.exists() || executionDir.mkdirs())) {
throw new Exception("mkdirs returned false for creating ["
+ pgeConfig.getExeDir() + "]");
}
@@ -212,7 +290,8 @@ public class PGETaskInstance implements
if (outputDir.isCreateBeforeExe()) {
log(Level.INFO, "Creating PGE file ouput directory: ["
+ outputDir.getPath() + "]");
- if (!new File(outputDir.getPath()).mkdirs()) {
+ File dir = new File(outputDir.getPath());
+ if (!(dir.exists() || dir.mkdirs())) {
throw new Exception("mkdir returned false for creating ["
+ outputDir.getPath() + "]");
}
@@ -232,103 +311,147 @@ public class PGETaskInstance implements
protected void createSciPgeConfigFile(DynamicConfigFile dynamicConfigFile)
throws Exception {
Validate.notNull(dynamicConfigFile, "dynamicConfigFile cannot be null");
- log(Level.INFO, "Starting creation of science PGE file [" + dynamicConfigFile.getFilePath() + "]...");
+ log(Level.FINE, "Starting creation of science PGE file ["
+ + dynamicConfigFile.getFilePath() + "]...");
// Create parent directory if it doesn't exist.
File parentDir = new File(dynamicConfigFile.getFilePath())
.getParentFile();
- if (!parentDir.exists()) {
- parentDir.mkdirs();
+ if (!(parentDir.exists() || parentDir.mkdirs())) {
+ throw new Exception("Failed to create directory where science PGE file ["
+ + dynamicConfigFile.getFilePath() + "] was to be written");
}
// Load writer and write file.
- log(Level.INFO, "Loading writer class for science PGE file [" + dynamicConfigFile.getFilePath() + "]...");
+ log(Level.FINE, "Loading writer class for science PGE file ["
+ + dynamicConfigFile.getFilePath() + "]...");
SciPgeConfigFileWriter writer = (SciPgeConfigFileWriter) Class.forName(
dynamicConfigFile.getWriterClass()).newInstance();
- log(Level.INFO, "Loaded writer [" + writer.getClass().getCanonicalName()
+ log(Level.FINE, "Loaded writer [" + writer.getClass().getCanonicalName()
+ "] for science PGE file [" + dynamicConfigFile.getFilePath()
+ "]...");
- log(Level.INFO, "Writing science PGE file [" + dynamicConfigFile.getFilePath() + "]...");
- writer.createConfigFile(dynamicConfigFile.getFilePath(),
+ log(Level.INFO,
+ "Writing science PGE file [" + dynamicConfigFile.getFilePath()
+ + "]...");
+ File configFile = writer.createConfigFile(dynamicConfigFile.getFilePath(),
pgeMetadata.asMetadata(), dynamicConfigFile.getArgs());
+ if (!configFile.exists()) {
+ throw new Exception("Writer failed to create config file ["
+ + configFile + "], exists returned false");
+ }
}
- protected void processOutput() throws FileNotFoundException, IOException {
- for (final OutputDir outputDir : this.pgeConfig.getOuputDirs()) {
+ protected void generateMetadataForProducts() throws Exception {
+ log(Level.INFO, "Generating metadata for products...");
+ for (OutputDir outputDir : pgeConfig.getOuputDirs()) {
+
+ log(Level.FINE, "Looking for products in output directory ["
+ + outputDir.getPath() + "]");
+
File[] createdFiles = new File(outputDir.getPath()).listFiles();
+ log(Level.FINE, "Found files: " + Lists.newArrayList(createdFiles));
+
for (File createdFile : createdFiles) {
- Metadata outputMetadata = new Metadata();
- for (RegExprOutputFiles regExprFiles : outputDir
- .getRegExprOutputFiles()) {
- if (Pattern.matches(regExprFiles.getRegExp(),
- createdFile.getName())) {
- try {
- PcsMetFileWriter writer = (PcsMetFileWriter) Class
- .forName(regExprFiles.getConverterClass())
- .newInstance();
- outputMetadata
- .replaceMetadata(this.getMetadataForFile(
- (regExprFiles.getRenamingConv() != null) ? createdFile = this
- .renameFile(createdFile,
- regExprFiles.getRenamingConv())
- : createdFile, writer, regExprFiles
- .getArgs()));
- } catch (Exception e) {
- log(Level.SEVERE,
- "Failed to create metadata file for '" + createdFile
- + "' : " + e.getMessage(), e);
- }
+ log(Level.FINE, "Inspecting file [" + createdFile + "]");
+ List<RegExprOutputFiles> regexRules = findMatchingRegexRules(
+ createdFile, outputDir);
+ if (!regexRules.isEmpty()) {
+ Metadata productMetadata = new Metadata();
+ for (RegExprOutputFiles regexRule : regexRules) {
+ productMetadata.replaceMetadata(generateMetadataForProduct(
+ createdFile, regexRule));
+ }
+ if (productMetadata.getAllKeys().isEmpty()) {
+ throw new Exception("No metadata was generated for product ["
+ + createdFile + "]");
}
+ writeMetadataFile(productMetadata, createdFile.getAbsolutePath()
+ + "." + pgeMetadata.getMetadata(MET_FILE_EXT));
+ } else {
+ log(Level.FINE, "Ignoring file [" + createdFile
+ + "] because it doesn't matches any product regex rules"
+ + " for this directory [" + outputDir.getPath() + "]");
}
- if (outputMetadata.getAllKeys().size() > 0)
- this.writeFromMetadata(
- outputMetadata,
- createdFile.getAbsolutePath()
- + "."
- + pgeMetadata.getMetadata(MET_FILE_EXT));
}
}
}
- protected File renameFile(File file, RenamingConv renamingConv)
+ protected List<RegExprOutputFiles> findMatchingRegexRules(File file,
+ OutputDir outputDir) throws Exception {
+ log(Level.FINE, "Checking file [" + file
+ + "] against regex rules for output directory ["
+ + outputDir.getPath() + "]");
+ List<RegExprOutputFiles> regexRules = Lists.newArrayList();
+ for (RegExprOutputFiles regExprFiles : outputDir.getRegExprOutputFiles()) {
+ if (Pattern.matches(regExprFiles.getRegExp(), file.getName())) {
+ log(Level.FINE, "File [" + file + "] matched regex rule ["
+ + regExprFiles.getRegExp() + "]");
+ regexRules.add(regExprFiles);
+ }
+ }
+ return regexRules;
+ }
+
+ protected Metadata generateMetadataForProduct(
+ File product, RegExprOutputFiles regexRule) throws Exception {
+ log(Level.FINE, "Loading metadata writer ["
+ + regexRule.getConverterClass() + "] for product [" + product + "]");
+ PcsMetFileWriter writer = (PcsMetFileWriter) Class.forName(
+ regexRule.getConverterClass()).newInstance();
+ if (regexRule.getRenamingConv() != null) {
+ log(Level.FINE, "Renaming product [" + product + "]...");
+ product = renameProduct(product, regexRule.getRenamingConv());
+ }
+ return getMetadataForProduct(product, writer, regexRule.getArgs());
+ }
+
+ protected File renameProduct(File product, RenamingConv renamingConv)
throws Exception {
- Metadata curMetadata = this.pgeMetadata.asMetadata();
+ Metadata curMetadata = pgeMetadata.asMetadata();
curMetadata.replaceMetadata(renamingConv.getTmpReplaceMet());
String newFileName = PathUtils.doDynamicReplacement(
renamingConv.getRenamingString(), curMetadata);
- File newFile = new File(file.getParentFile(), newFileName);
- log(Level.INFO, "Renaming file '" + file.getAbsolutePath() + "' to '"
- + newFile.getAbsolutePath() + "'");
- if (!file.renameTo(newFile))
+ File newFile = new File(product.getParentFile(), newFileName);
+ log(Level.INFO, "Renaming product [" + product + "] to ["
+ + newFile + "]");
+ if (!product.renameTo(newFile)) {
throw new IOException("Renaming returned false");
+ }
return newFile;
}
- protected Metadata getMetadataForFile(File sciPgeCreatedDataFile,
+ protected Metadata getMetadataForProduct(File product,
PcsMetFileWriter writer, Object[] args) throws Exception {
- return writer.getMetadataForFile(sciPgeCreatedDataFile, this.pgeMetadata,
- args);
+ log(Level.INFO, "Generating metadata for product [" + product + "]");
+ return writer.getMetadataForFile(product, pgeMetadata, args);
}
- protected void writeFromMetadata(Metadata metadata, String toMetFilePath)
- throws FileNotFoundException, IOException {
+ protected void writeMetadataFile(Metadata metadata, String toFile)
+ throws Exception {
+ log(Level.INFO, "Writing out metadata file [" + toFile + "]");
new SerializableMetadata(metadata, "UTF-8", false)
- .writeMetadataToXmlStream(new FileOutputStream(toMetFilePath));
+ .writeMetadataToXmlStream(new FileOutputStream(toFile));
}
protected ScriptFile buildPgeRunScript() {
- ScriptFile sf = new ScriptFile(this.pgeConfig.getShellType());
- sf.setCommands(this.pgeConfig.getExeCmds());
+ log(Level.FINE,
+ "Creating PGE run script for shell [" + pgeConfig.getShellType()
+ + "] with contents " + pgeConfig.getExeCmds());
+ ScriptFile sf = new ScriptFile(pgeConfig.getShellType());
+ sf.setCommands(pgeConfig.getExeCmds());
return sf;
}
- protected String getScriptPath() {
- return new File(this.pgeConfig.getExeDir()).getAbsolutePath() + "/"
- + this.getPgeScriptName();
+ protected File getScriptPath() {
+ File script = new File(pgeConfig.getExeDir(), getPgeScriptName());
+ log(Level.FINE, "Script file with be written to [" + script + "]");
+ return script;
}
protected String getPgeScriptName() {
- return "sciPgeExeScript_" + this.pgeMetadata.getMetadata(NAME);
+ String pgeScriptName = "sciPgeExeScript_" + pgeMetadata.getMetadata(NAME);
+ log(Level.FINE, "Generated script file name [" + pgeScriptName + "]");
+ return pgeScriptName;
}
protected void runPge() throws Exception {
@@ -337,24 +460,32 @@ public class PGETaskInstance implements
OutputStream errOS = createStdErrLogger();
try {
long startTime = System.currentTimeMillis();
+ log(Level.INFO, "PGE start time [" + new Date(startTime) + "]");
// create script to run
- sf = this.buildPgeRunScript();
- sf.writeScriptFile(this.getScriptPath());
+ sf = buildPgeRunScript();
+ sf.writeScriptFile(getScriptPath().getAbsolutePath());
// run script and evaluate whether success or failure
- this.updateStatus(RUNNING_PGE.getWorkflowStatusName());
- if (!this.wasPgeSuccessful(ExecUtils.callProgram(
- this.pgeConfig.getShellType() + " " + this.getScriptPath(),
+ updateStatus(RUNNING_PGE.getWorkflowStatusName());
+ log(Level.INFO, "Starting execution of PGE...");
+ if (!wasPgeSuccessful(ExecUtils.callProgram(
+ pgeConfig.getShellType() + " " + getScriptPath(),
stdOS, errOS,
- new File(this.pgeConfig.getExeDir()).getAbsoluteFile())))
+ new File(pgeConfig.getExeDir()).getAbsoluteFile()))) {
throw new RuntimeException("Pge didn't finish successfully");
- else
+ } else {
log(Level.INFO,
"Successfully completed running: '" + sf.getCommands() + "'");
+ }
long endTime = System.currentTimeMillis();
- pgeMetadata.replaceMetadata(PGE_RUNTIME, (endTime - startTime) + "");
+ log(Level.INFO, "PGE end time [" + new Date(startTime) + "]");
+
+ long runTime = endTime - startTime;
+ log(Level.INFO, "PGE runtime in millis [" + runTime + "]");
+
+ pgeMetadata.replaceMetadata(PGE_RUNTIME, Long.toString(runTime));
} catch (Exception e) {
throw new Exception("Exception when executing PGE commands '"
@@ -378,21 +509,9 @@ public class PGETaskInstance implements
return returnCode == 0;
}
- protected void ingestProducts() throws Exception {
+ protected ProductCrawler createProductCrawler() throws Exception {
+ log(Level.INFO, "Configuring ProductCrawler...");
StdProductCrawler crawler = new StdProductCrawler();
- this.setCrawlerConfigurations(crawler);
- this.runIngestCrawler(crawler, this.getOutputDirs());
- }
-
- protected List<File> getOutputDirs() {
- List<File> outputDirs = new LinkedList<File>();
- for (OutputDir outputDir : pgeConfig.getOuputDirs())
- outputDirs.add(new File(outputDir.getPath()));
- return outputDirs;
- }
-
- protected void setCrawlerConfigurations(StdProductCrawler crawler)
- throws Exception {
crawler.setMetFileExtension(pgeMetadata.getMetadata(MET_FILE_EXT));
crawler.setClientTransferer(pgeMetadata
.getMetadata(INGEST_CLIENT_TRANSFER_SERVICE_FACTORY));
@@ -417,39 +536,39 @@ public class PGETaskInstance implements
"true") : true;
crawler.setCrawlForDirs(crawlForDirs);
crawler.setNoRecur(!recur);
- log(Level.INFO,
+ log(Level.FINE,
"Passing Workflow Metadata to CAS-Crawler as global metadata . . .");
- crawler.setGlobalMetadata(this.pgeMetadata
- .asMetadata(PgeMetadata.Type.DYNAMIC));
+ crawler.setGlobalMetadata(pgeMetadata.asMetadata(PgeMetadata.Type.DYNAMIC));
+ log(Level.FINE, "Created ProductCrawler ["
+ + crawler.getClass().getCanonicalName() + "]");
+ return crawler;
}
- protected void runIngestCrawler(StdProductCrawler crawler,
- List<File> crawlDirs) {
- File currentDir = null;
- try {
- this.updateStatus(CRAWLING.getWorkflowStatusName());
- boolean attemptIngestAll = Boolean.parseBoolean(pgeMetadata
- .getMetadata(ATTEMPT_INGEST_ALL));
- for (File crawlDir : crawlDirs) {
- currentDir = crawlDir;
- log(Level.INFO, "Executing StdProductCrawler in productPath: ["
- + crawlDir + "]");
- crawler.crawl(crawlDir);
- if (!attemptIngestAll)
- this.verifyIngests(crawler);
+ protected void runIngestCrawler(ProductCrawler crawler) throws Exception {
+ // Determine directories to crawl.
+ List<File> crawlDirs = new LinkedList<File>();
+ for (OutputDir outputDir : pgeConfig.getOuputDirs()) {
+ crawlDirs.add(new File(outputDir.getPath()));
+ }
+
+ // Start crawlin...
+ updateStatus(CRAWLING.getWorkflowStatusName());
+ boolean attemptIngestAll = Boolean.parseBoolean(pgeMetadata
+ .getMetadata(ATTEMPT_INGEST_ALL));
+ for (File crawlDir : crawlDirs) {
+ log(Level.INFO, "Crawling for products in [" + crawlDir + "]");
+ crawler.crawl(crawlDir);
+ if (!attemptIngestAll) {
+ verifyIngests(crawler);
}
- if (attemptIngestAll)
- this.verifyIngests(crawler);
- } catch (Exception e) {
- log(Level.WARNING,
- "Failed while attempting to ingest products while crawling directory '"
- + currentDir
- + "' (all products may not have been ingested) : "
- + e.getMessage(), e);
+ }
+ if (attemptIngestAll) {
+ verifyIngests(crawler);
}
}
protected void verifyIngests(ProductCrawler crawler) throws Exception {
+ log(Level.INFO, "Verifying ingests successful...");
boolean ingestsSuccess = true;
String exceptionMsg = "";
for (IngestStatus status : crawler.getIngestStatus()) {
@@ -465,49 +584,17 @@ public class PGETaskInstance implements
+ status.getResult() + "',msg='" + status.getMessage() + "']");
}
}
- if (!ingestsSuccess)
+ if (!ingestsSuccess) {
throw new Exception(exceptionMsg);
+ } else {
+ log(Level.INFO, "Ingests were successful");
+ }
}
- protected void updateDynamicMetadata() {
+ protected void updateDynamicMetadata() throws Exception {
pgeMetadata.commitMarkedDynamicMetadataKeys();
- }
-
- public void run(Metadata metadata, WorkflowTaskConfiguration config)
- throws WorkflowTaskInstanceException {
- Handler handler = null;
- try {
- // Initialize CAS-PGE.
- pgeMetadata = createPgeMetadata(metadata, config);
- pgeConfig = createPgeConfig();
- runPropertyAdders();
- wm = createWorkflowManagerClient();
- workflowInstId = getWorkflowInstanceId();
-
- // Initialize Logger.
- handler = initializePgeLogger();
-
- // Setup the PGE.
- createExeDir();
- createOuputDirsIfRequested();
- updateStatus(CONF_FILE_BUILD.getWorkflowStatusName());
- createSciPgeConfigFiles();
-
- // Run the PGE and proccess it data.
- runPge();
-
- // Update metadata.
- processOutput();
- updateDynamicMetadata();
-
- // Inject products.
- ingestProducts();
- } catch (Exception e) {
- throw new WorkflowTaskInstanceException("PGETask failed : "
- + e.getMessage(), e);
- } finally {
- closePgeLogger(handler);
- }
+ wm.updateMetadataForWorkflow(workflowInstId,
+ pgeMetadata.asMetadata(PgeMetadata.Type.DYNAMIC));
}
/**
@@ -540,7 +627,6 @@ public class PGETaskInstance implements
@Override
public void flush() {
- System.out.println("HELLO");
if (buffer.position() > 0) {
char[] flushContext = new char[buffer.position()];
System.arraycopy(buffer.array(), 0, flushContext, 0,
Modified: oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/config/XmlFilePgeConfigBuilder.java
URL: http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/config/XmlFilePgeConfigBuilder.java?rev=1302040&r1=1302039&r2=1302040&view=diff
==============================================================================
--- oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/config/XmlFilePgeConfigBuilder.java (original)
+++ oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/config/XmlFilePgeConfigBuilder.java Sun Mar 18 00:01:57 2012
@@ -92,14 +92,13 @@ public class XmlFilePgeConfigBuilder imp
public PgeConfig build(PgeMetadata pgeMetadata) throws IOException {
try {
PgeConfig pgeConfig = new PgeConfig();
- this.buildImports(this.fillIn(pgeMetadata
- .getMetadata(CONFIG_FILE_PATH.getName()),
- pgeMetadata.asMetadata()), null, pgeConfig, pgeMetadata);
+ buildImports(
+ fillIn(pgeMetadata.getMetadata(CONFIG_FILE_PATH),
+ pgeMetadata.asMetadata()), null, pgeConfig, pgeMetadata);
return pgeConfig;
} catch (Exception e) {
- e.printStackTrace();
throw new IOException("Failed to build PgeConfig : "
- + e.getMessage());
+ + e.getMessage(), e);
}
}
@@ -163,7 +162,11 @@ public class XmlFilePgeConfigBuilder imp
pgeConfig.addOuputDirAndExpressions(outputDir);
// add local pge metadata to global pge metadata with given namespace
- pgeMetadata.replaceMetadata(localPgeMetadata, namespace);
+ if (namespace != null) {
+ pgeMetadata.replaceMetadata(localPgeMetadata, namespace);
+ } else {
+ pgeMetadata.replaceMetadata(localPgeMetadata);
+ }
}
private PgeMetadata getCustomMetadata(Element customMetadataElem, PgeMetadata pgeMetadata)
Modified: oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogHandler.java
URL: http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogHandler.java?rev=1302040&r1=1302039&r2=1302040&view=diff
==============================================================================
--- oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogHandler.java (original)
+++ oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogHandler.java Sun Mar 18 00:01:57 2012
@@ -32,18 +32,18 @@ import java.util.logging.StreamHandler;
*/
public class PgeLogHandler extends StreamHandler {
- private String pgeName;
+ private String workflowInstId;
- public PgeLogHandler(String pgeName, OutputStream os)
+ public PgeLogHandler(String workflowInstId, OutputStream os)
throws SecurityException, FileNotFoundException {
super(os, new SimpleFormatter());
- this.pgeName = pgeName;
+ this.workflowInstId = workflowInstId;
}
@Override
public void publish(LogRecord record) {
if (record instanceof PgeLogRecord
- && pgeName.equals(((PgeLogRecord) record).getPgeName())) {
+ && workflowInstId.equals(((PgeLogRecord) record).getWorkflowInstId())) {
super.publish(record);
}
}
Modified: oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogRecord.java
URL: http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogRecord.java?rev=1302040&r1=1302039&r2=1302040&view=diff
==============================================================================
--- oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogRecord.java (original)
+++ oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogRecord.java Sun Mar 18 00:01:57 2012
@@ -29,19 +29,19 @@ public class PgeLogRecord extends LogRec
private static final long serialVersionUID = 2334166761035931387L;
- private String pgeName;
+ private String workflowInstId;
- public PgeLogRecord(String pgeName, Level level, String msg) {
+ public PgeLogRecord(String workflowInstId, Level level, String msg) {
super(level, msg);
- this.pgeName = pgeName;
+ this.workflowInstId = workflowInstId;
}
- public PgeLogRecord(String pgeName, Level level, String msg, Throwable t) {
- this(pgeName, level, msg);
+ public PgeLogRecord(String workflowInstId, Level level, String msg, Throwable t) {
+ this(workflowInstId, level, msg);
setThrown(t);
}
- public String getPgeName() {
- return pgeName;
+ public String getWorkflowInstId() {
+ return workflowInstId;
}
}
Modified: oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetKeys.java
URL: http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetKeys.java?rev=1302040&r1=1302039&r2=1302040&view=diff
==============================================================================
--- oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetKeys.java (original)
+++ oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetKeys.java Sun Mar 18 00:01:57 2012
@@ -39,6 +39,12 @@ public enum PgeTaskMetKeys {
"PGETask/ConfigFilePath",
"PGETask_ConfigFilePath"),
/**
+ * CAS-PGE's ConfigBuilder classpath.
+ */
+ PGE_CONFIG_BUILDER(
+ "PGETask/PgeConfigBuilder",
+ "PGETask/PgeConfigBuilder"),
+ /**
* List of {@link ConfigFilePropertyAdder}s classpaths to be run.
*/
PROPERTY_ADDERS(
Modified: oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java
URL: http://svn.apache.org/viewvc/oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java?rev=1302040&r1=1302039&r2=1302040&view=diff
==============================================================================
--- oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java (original)
+++ oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java Sun Mar 18 00:01:57 2012
@@ -18,13 +18,17 @@ package org.apache.oodt.cas.pge;
//OODT static imports
import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.NAME;
+import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.CONFIG_FILE_PATH;
+import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.PGE_CONFIG_BUILDER;
import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.PROPERTY_ADDERS;
+import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.REQUIRED_METADATA;
+import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.WORKFLOW_MANAGER_URL;
//JDK imports
import java.io.File;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
-import java.util.logging.Handler;
import java.util.logging.Level;
//Apache imports
@@ -34,13 +38,21 @@ import org.apache.commons.io.FileUtils;
import org.apache.oodt.cas.metadata.Metadata;
import org.apache.oodt.cas.pge.PGETaskInstance;
import org.apache.oodt.cas.pge.PGETaskInstance.LoggerOuputStream;
+import org.apache.oodt.cas.pge.config.DynamicConfigFile;
+import org.apache.oodt.cas.pge.config.MockPgeConfigBuilder;
+import org.apache.oodt.cas.pge.config.OutputDir;
import org.apache.oodt.cas.pge.config.PgeConfig;
import org.apache.oodt.cas.pge.metadata.PgeMetadata;
import org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys;
+import org.apache.oodt.cas.pge.metadata.PgeTaskStatus;
+import org.apache.oodt.cas.pge.writers.MockSciPgeConfigFileWriter;
+import org.apache.oodt.cas.workflow.metadata.CoreMetKeys;
import org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration;
+import org.apache.oodt.cas.workflow.system.XmlRpcWorkflowManagerClient;
//Google imports
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
//JUnit imports
import junit.framework.TestCase;
@@ -52,8 +64,22 @@ import junit.framework.TestCase;
*/
public class TestPGETaskInstance extends TestCase {
+ private List<PGETaskInstance> pgeTasks = Lists.newArrayList();
+ private List<File> tmpDirs = Lists.newArrayList();
+
+ public void tearDown() throws Exception {
+ for (PGETaskInstance pgeTask : pgeTasks) {
+ pgeTask.closePgeLogger();
+ }
+ pgeTasks.clear();
+ for (File tmpDir : tmpDirs) {
+ FileUtils.forceDelete(tmpDir);
+ }
+ tmpDirs.clear();
+ }
+
public void testLoadPropertyAdders() throws Exception {
- PGETaskInstance pgeTask = new PGETaskInstance();
+ PGETaskInstance pgeTask = createTestInstance();
ConfigFilePropertyAdder propAdder = pgeTask
.loadPropertyAdder(MockConfigFilePropertyAdder.class
.getCanonicalName());
@@ -62,42 +88,30 @@ public class TestPGETaskInstance extends
}
public void testRunPropertyAdders() throws Exception {
- PGETaskInstance pgeTask = new PGETaskInstance();
- Metadata staticMet = new Metadata();
- staticMet.addMetadata(PROPERTY_ADDERS.getName(),
+ PGETaskInstance pgeTask = createTestInstance();
+ pgeTask.pgeMetadata.replaceMetadata(PROPERTY_ADDERS,
MockConfigFilePropertyAdder.class.getCanonicalName());
- Metadata dynMet = new Metadata();
- pgeTask.pgeMetadata = new PgeMetadata(staticMet, dynMet);
- pgeTask.pgeConfig = new PgeConfig();
pgeTask.pgeConfig.setPropertyAdderCustomArgs(new Object[] { "key",
"value" });
pgeTask.runPropertyAdders();
assertEquals("value", pgeTask.pgeMetadata.getMetadata("key"));
- staticMet = new Metadata();
- dynMet = new Metadata();
- dynMet.addMetadata(PROPERTY_ADDERS.getName(), Lists.newArrayList(
+ pgeTask.pgeMetadata.replaceMetadata(
+ MockConfigFilePropertyAdder.RUN_COUNTER, "0");
+ pgeTask.pgeMetadata.replaceMetadata(PROPERTY_ADDERS, Lists.newArrayList(
MockConfigFilePropertyAdder.class.getCanonicalName(),
MockConfigFilePropertyAdder.class.getCanonicalName()));
- pgeTask.pgeMetadata = new PgeMetadata(staticMet, dynMet);
- pgeTask.pgeConfig = new PgeConfig();
- pgeTask.pgeConfig.setPropertyAdderCustomArgs(new Object[] { "key",
- "value" });
pgeTask.runPropertyAdders();
assertEquals("value", pgeTask.pgeMetadata.getMetadata("key"));
assertEquals("2",
pgeTask.pgeMetadata
.getMetadata(MockConfigFilePropertyAdder.RUN_COUNTER));
+ pgeTask.pgeMetadata.replaceMetadata(
+ MockConfigFilePropertyAdder.RUN_COUNTER, "0");
System.setProperty(PgeTaskMetKeys.USE_LEGACY_PROPERTY, "true");
- staticMet = new Metadata();
- dynMet = new Metadata();
- dynMet.addMetadata(PROPERTY_ADDERS.getName(),
+ pgeTask.pgeMetadata.replaceMetadata(PROPERTY_ADDERS.getName(),
MockConfigFilePropertyAdder.class.getCanonicalName());
- pgeTask.pgeMetadata = new PgeMetadata(staticMet, dynMet);
- pgeTask.pgeConfig = new PgeConfig();
- pgeTask.pgeConfig.setPropertyAdderCustomArgs(new Object[] { "key",
- "value" });
pgeTask.runPropertyAdders();
assertEquals("value", pgeTask.pgeMetadata.getMetadata("key"));
assertEquals("1",
@@ -109,92 +123,63 @@ public class TestPGETaskInstance extends
final String PGE_NAME = "PGE_Test";
final String PGE_REQUIRED_METADATA = "Filename, FileLocation ";
final String PROP_ADDERS = "some.prop.adder.classpath,some.other.classpath";
- PGETaskInstance pgeTask = new PGETaskInstance();
+ PGETaskInstance pgeTask = createTestInstance();
Metadata dynMet = new Metadata();
WorkflowTaskConfiguration config = new WorkflowTaskConfiguration();
- config.addConfigProperty(PgeTaskMetKeys.NAME.getName(), PGE_NAME);
- config.addConfigProperty(PgeTaskMetKeys.REQUIRED_METADATA.getName(),
+ config.addConfigProperty(NAME.getName(), PGE_NAME);
+ config.addConfigProperty(REQUIRED_METADATA.getName(),
PGE_REQUIRED_METADATA);
- config.addConfigProperty(PgeTaskMetKeys.PROPERTY_ADDERS.getName(),
- PROP_ADDERS);
+ config.addConfigProperty(PROPERTY_ADDERS.getName(), PROP_ADDERS);
PgeMetadata pgeMet = pgeTask.createPgeMetadata(dynMet, config);
- assertEquals(1, pgeMet.getAllMetadata(PgeTaskMetKeys.NAME.getName())
- .size());
- assertEquals(PGE_NAME,
- pgeMet.getAllMetadata(PgeTaskMetKeys.NAME.getName()).get(0));
- assertEquals(2,
- pgeMet.getAllMetadata(PgeTaskMetKeys.REQUIRED_METADATA.getName())
- .size());
- assertTrue(pgeMet.getAllMetadata(
- PgeTaskMetKeys.REQUIRED_METADATA.getName()).contains("Filename"));
- assertTrue(pgeMet.getAllMetadata(
- PgeTaskMetKeys.REQUIRED_METADATA.getName())
- .contains("FileLocation"));
- assertEquals(2,
- pgeMet.getAllMetadata(PgeTaskMetKeys.PROPERTY_ADDERS.getName())
- .size());
- assertTrue(pgeMet
- .getAllMetadata(PgeTaskMetKeys.PROPERTY_ADDERS.getName()).contains(
- "some.prop.adder.classpath"));
- assertTrue(pgeMet
- .getAllMetadata(PgeTaskMetKeys.PROPERTY_ADDERS.getName()).contains(
- "some.other.classpath"));
+ assertEquals(1, pgeMet.getAllMetadata(NAME).size());
+ assertEquals(PGE_NAME, pgeMet.getAllMetadata(NAME).get(0));
+ assertEquals(2, pgeMet.getAllMetadata(REQUIRED_METADATA).size());
+ assertTrue(pgeMet.getAllMetadata(REQUIRED_METADATA).contains("Filename"));
+ assertTrue(pgeMet.getAllMetadata(REQUIRED_METADATA).contains(
+ "FileLocation"));
+ assertEquals(2, pgeMet.getAllMetadata(PROPERTY_ADDERS).size());
+ assertTrue(pgeMet.getAllMetadata(PROPERTY_ADDERS).contains(
+ "some.prop.adder.classpath"));
+ assertTrue(pgeMet.getAllMetadata(PROPERTY_ADDERS).contains(
+ "some.other.classpath"));
// Verify still works when only one property adder is specified.
- pgeTask = new PGETaskInstance();
+ pgeTask = createTestInstance();
config = new WorkflowTaskConfiguration();
config.addConfigProperty(PgeTaskMetKeys.PROPERTY_ADDERS.getName(),
"one.prop.adder.only");
pgeMet = pgeTask.createPgeMetadata(dynMet, config);
- assertEquals(1,
- pgeMet.getAllMetadata(PgeTaskMetKeys.PROPERTY_ADDERS.getName())
- .size());
- assertEquals("one.prop.adder.only",
- pgeMet.getAllMetadata(PgeTaskMetKeys.PROPERTY_ADDERS.getName())
- .get(0));
+ assertEquals(1, pgeMet.getAllMetadata(PROPERTY_ADDERS).size());
+ assertEquals("one.prop.adder.only", pgeMet
+ .getAllMetadata(PROPERTY_ADDERS).get(0));
}
public void testLogger() throws Exception {
File tmpFile = File.createTempFile("bogus", "bogus");
File tmpDir = tmpFile.getParentFile();
tmpFile.delete();
- File tmpDir1 = new File(tmpDir, UUID.randomUUID().toString());
- assertTrue(tmpDir1.mkdirs());
- File tmpDir2 = new File(tmpDir, UUID.randomUUID().toString());
- assertTrue(tmpDir2.mkdirs());
File tmpDir3 = new File(tmpDir, UUID.randomUUID().toString());
assertTrue(tmpDir3.mkdirs());
- final String PGE_1_NAME = "PGE1";
- PGETaskInstance pgeTask1 = new PGETaskInstance();
- pgeTask1.pgeMetadata = new PgeMetadata();
- pgeTask1.pgeMetadata.replaceMetadata(NAME, PGE_1_NAME);
- pgeTask1.pgeConfig = new PgeConfig();
- pgeTask1.pgeConfig.setExeDir(tmpDir1.getAbsolutePath());
- Handler handler1 = pgeTask1.initializePgeLogger();
+ PGETaskInstance pgeTask1 = createTestInstance();
pgeTask1.log(Level.INFO, "pge1 message1");
pgeTask1.log(Level.INFO, "pge1 message2");
pgeTask1.log(Level.INFO, "pge1 message3");
- pgeTask1.closePgeLogger(handler1);
+ pgeTask1.closePgeLogger();
List<String> messages = FileUtils.readLines(
- new File(tmpDir1, "logs").listFiles()[0], "UTF-8");
+ new File(pgeTask1.pgeConfig.getExeDir() + "/logs").listFiles()[0],
+ "UTF-8");
assertEquals("INFO: pge1 message1", messages.get(1));
assertEquals("INFO: pge1 message2", messages.get(3));
assertEquals("INFO: pge1 message3", messages.get(5));
- final String PGE_2_NAME = "PGE2";
- PGETaskInstance pgeTask2 = new PGETaskInstance();
- pgeTask2.pgeMetadata = new PgeMetadata();
- pgeTask2.pgeMetadata.replaceMetadata(NAME, PGE_2_NAME);
- pgeTask2.pgeConfig = new PgeConfig();
- pgeTask2.pgeConfig.setExeDir(tmpDir2.getAbsolutePath());
- Handler handler2 = pgeTask2.initializePgeLogger();
+ PGETaskInstance pgeTask2 = createTestInstance();
pgeTask2.log(Level.SEVERE, "pge2 message1");
- pgeTask2.closePgeLogger(handler2);
- messages = FileUtils.readLines(new File(tmpDir2, "logs").listFiles()[0],
- "UTF-8");
+ pgeTask2.closePgeLogger();
+ messages = FileUtils.readLines(new File(pgeTask2.pgeConfig.getExeDir()
+ + "/logs").listFiles()[0], "UTF-8");
assertEquals("SEVERE: pge2 message1", messages.get(1));
PGETaskInstance pgeTask3 = new PGETaskInstance() {
@@ -203,15 +188,15 @@ public class TestPGETaskInstance extends
return new LoggerOuputStream(Level.INFO, 10);
}
};
+ pgeTask3.workflowInstId = "1234";
pgeTask3.pgeMetadata = new PgeMetadata();
- pgeTask3.pgeMetadata.replaceMetadata(NAME, "TestPGE");
pgeTask3.pgeConfig = new PgeConfig();
pgeTask3.pgeConfig.setExeDir(tmpDir3.getAbsolutePath());
- Handler handler3 = pgeTask3.initializePgeLogger();
+ pgeTask3.initializePgeLogger();
LoggerOuputStream los = pgeTask3.createStdOutLogger();
los.write("This is a test write to a log file".getBytes());
los.close();
- pgeTask3.closePgeLogger(handler3);
+ pgeTask3.closePgeLogger();
messages = FileUtils.readLines(new File(tmpDir3, "logs").listFiles()[0],
"UTF-8");
assertEquals(8, messages.size());
@@ -220,8 +205,140 @@ public class TestPGETaskInstance extends
assertEquals("INFO: to a log ", messages.get(5));
assertEquals("INFO: file", messages.get(7));
- FileUtils.forceDelete(tmpDir1);
- FileUtils.forceDelete(tmpDir2);
FileUtils.forceDelete(tmpDir3);
}
+
+ public void testUpdateStatus() throws Exception {
+ final Map<String, String> args = Maps.newHashMap();
+ PGETaskInstance pgeTask = createTestInstance();
+ pgeTask.wm = new XmlRpcWorkflowManagerClient(null) {
+ public boolean updateWorkflowInstanceStatus(String instanceId,
+ String status) {
+ args.put("InstanceId", instanceId);
+ args.put("Status", status);
+ return true;
+ }
+ };
+ String instanceId = "Test ID";
+ String status = PgeTaskStatus.CRAWLING.getWorkflowStatusName();
+ pgeTask.workflowInstId = instanceId;
+ pgeTask.updateStatus(status);
+ assertEquals(instanceId, args.get("InstanceId"));
+ assertEquals(status, args.get("Status"));
+ }
+
+ public void testCreatePgeConfig() throws Exception {
+ final String KEY = "TestKey";
+ final String VALUE = "TestValue";
+ File pgeConfigFile = new File(createTmpDir("1234"), "pgeConfig.xml");
+ FileUtils.writeLines(pgeConfigFile, "UTF-8",
+ Lists.newArrayList(
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>",
+ "<pgeConfig>",
+ " <customMetadata>",
+ " <metadata key=\"" + KEY + "\" val=\"" + VALUE
+ + "\"/>",
+ " </customMetadata>",
+ "</pgeConfig>"));
+ PGETaskInstance pgeTask = createTestInstance();
+ pgeTask.pgeMetadata.replaceMetadata(CONFIG_FILE_PATH,
+ pgeConfigFile.getAbsolutePath());
+ PgeConfig pgeConfig = pgeTask.createPgeConfig();
+ assertNotNull(pgeConfig);
+ assertEquals(VALUE, pgeTask.pgeMetadata.getMetadata(KEY));
+
+ pgeTask = createTestInstance();
+ pgeTask.pgeMetadata.replaceMetadata(PGE_CONFIG_BUILDER,
+ MockPgeConfigBuilder.class.getCanonicalName());
+ pgeConfig = pgeTask.createPgeConfig();
+ assertEquals(MockPgeConfigBuilder.MOCK_EXE_DIR, pgeConfig.getExeDir());
+ }
+
+ public void testCreateWorkflowManagerClient() throws Exception {
+ PGETaskInstance pgeTask = createTestInstance();
+ pgeTask.pgeMetadata.replaceMetadata(WORKFLOW_MANAGER_URL,
+ "http://localhost:8888");
+ XmlRpcWorkflowManagerClient wmClient =
+ pgeTask.createWorkflowManagerClient();
+ assertNotNull(wmClient);
+ }
+
+ public void testGetWorkflowInstanceId() throws Exception {
+ String workflowInstId = "12345";
+ PGETaskInstance pgeTask = createTestInstance();
+ pgeTask.pgeMetadata.replaceMetadata(CoreMetKeys.WORKFLOW_INST_ID,
+ workflowInstId);
+ assertEquals(workflowInstId, pgeTask.getWorkflowInstanceId());
+ }
+
+ public void testCreateExeDir() throws Exception {
+ PGETaskInstance pgeTask = createTestInstance();
+ File exeDir = new File(pgeTask.pgeConfig.getExeDir());
+ FileUtils.deleteDirectory(exeDir);
+ assertFalse(exeDir.exists());
+ pgeTask.createExeDir();
+ assertTrue(exeDir.exists());
+ }
+
+ public void testCreateOuputDirsIfRequested() throws Exception {
+ PGETaskInstance pgeTask = createTestInstance();
+ File outputDir1 = createTmpDir("outputDir1");
+ FileUtils.forceDelete(outputDir1);
+ File outputDir2 = createTmpDir("outputDir2");
+ FileUtils.forceDelete(outputDir2);
+ File outputDir3 = new File("/some/file/path");
+ assertFalse(outputDir1.exists());
+ assertFalse(outputDir2.exists());
+ assertFalse(outputDir3.exists());
+ pgeTask.pgeConfig.addOuputDirAndExpressions(new OutputDir(outputDir1
+ .getAbsolutePath(), true));
+ pgeTask.pgeConfig.addOuputDirAndExpressions(new OutputDir(outputDir2
+ .getAbsolutePath(), true));
+ pgeTask.pgeConfig.addOuputDirAndExpressions(new OutputDir(outputDir3
+ .getAbsolutePath(), false));
+ pgeTask.createOuputDirsIfRequested();
+ assertTrue(outputDir1.exists());
+ assertTrue(outputDir2.exists());
+ assertFalse(outputDir3.exists());
+ }
+
+ public void testCreateSciPgeConfigFile() throws Exception {
+ File tmpDir = createTmpDir("sciPgeDir");
+ FileUtils.forceDelete(tmpDir);
+ assertFalse(tmpDir.exists());
+ PGETaskInstance pgeTask = createTestInstance();
+ File sciPgeConfigFile = new File(tmpDir, "SciPgeConfig.xml");
+ assertFalse(sciPgeConfigFile.exists());
+ pgeTask.createSciPgeConfigFile(new DynamicConfigFile(sciPgeConfigFile.getAbsolutePath(),
+ MockSciPgeConfigFileWriter.class.getCanonicalName(),
+ new Object[] {}));
+ assertTrue(sciPgeConfigFile.exists());
+ }
+
+ private PGETaskInstance createTestInstance() throws Exception {
+ return createTestInstance(UUID.randomUUID().toString());
+ }
+
+ private PGETaskInstance createTestInstance(String workflowInstId)
+ throws Exception {
+ PGETaskInstance pgeTask = new PGETaskInstance();
+ pgeTask.workflowInstId = workflowInstId;
+ pgeTask.pgeMetadata = new PgeMetadata();
+ pgeTask.pgeMetadata.replaceMetadata(NAME, "TestPGE");
+ pgeTask.pgeConfig = new PgeConfig();
+ File exeDir = createTmpDir(workflowInstId);
+ pgeTask.pgeConfig.setExeDir(exeDir.getAbsolutePath());
+ pgeTask.initializePgeLogger();
+ pgeTasks.add(pgeTask);
+ return pgeTask;
+ }
+
+ private File createTmpDir(String workflowInstId) throws Exception {
+ File tmpFile = File.createTempFile("bogus", "bogus");
+ File tmpDir = new File(tmpFile.getParentFile(), workflowInstId);
+ tmpFile.delete();
+ tmpDir.mkdirs();
+ tmpDirs.add(tmpDir);
+ return tmpDir;
+ }
}
Added: oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/config/MockPgeConfigBuilder.java
URL: http://svn.apache.org/viewvc/oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/config/MockPgeConfigBuilder.java?rev=1302040&view=auto
==============================================================================
--- oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/config/MockPgeConfigBuilder.java (added)
+++ oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/config/MockPgeConfigBuilder.java Sun Mar 18 00:01:57 2012
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.pge.config;
+
+//OODT imports
+import org.apache.oodt.cas.pge.metadata.PgeMetadata;
+
+/**
+ * Mock implementation of {@link PgeConfigBuilder}.
+ *
+ * @author bfoster (Brian Foster)
+ */
+public class MockPgeConfigBuilder implements PgeConfigBuilder {
+
+ public static final String MOCK_EXE_DIR = "/mock/exe/dir";
+
+ public PgeConfig build(PgeMetadata pgeMetadata) throws Exception {
+ PgeConfig pgeConfig = new PgeConfig();
+ pgeConfig.setExeDir(MOCK_EXE_DIR);
+ return pgeConfig;
+ }
+}
Propchange: oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/config/MockPgeConfigBuilder.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/logging/TestPgeLogHandler.java
URL: http://svn.apache.org/viewvc/oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/logging/TestPgeLogHandler.java?rev=1302040&r1=1302039&r2=1302040&view=diff
==============================================================================
--- oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/logging/TestPgeLogHandler.java (original)
+++ oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/logging/TestPgeLogHandler.java Sun Mar 18 00:01:57 2012
@@ -39,18 +39,18 @@ import junit.framework.TestCase;
public class TestPgeLogHandler extends TestCase {
public void testSeparatesMultipleCasPgeLogWritesByPgeName() throws SecurityException, FileNotFoundException {
- final String PGE_1_NAME = "PGE1";
- final String PGE_2_NAME = "PGE2";
+ final String instanceId1 = "1234";
+ final String instanceId2 = "4321";
final StringBuffer pge1LogMessages = new StringBuffer("");
- PgeLogHandler handler1 = new PgeLogHandler(PGE_1_NAME, new OutputStream() {
+ PgeLogHandler handler1 = new PgeLogHandler(instanceId1, new OutputStream() {
@Override
public void write(int character) throws IOException {
pge1LogMessages.append((char) character);
}
});
final StringBuffer pge2LogMessages = new StringBuffer("");
- PgeLogHandler handler2 = new PgeLogHandler(PGE_2_NAME, new OutputStream() {
+ PgeLogHandler handler2 = new PgeLogHandler(instanceId2, new OutputStream() {
@Override
public void write(int character) throws IOException {
pge2LogMessages.append((char) character);
@@ -61,11 +61,11 @@ public class TestPgeLogHandler extends T
logger.addHandler(handler1);
logger.addHandler(handler2);
- logger.log(new PgeLogRecord(PGE_1_NAME, Level.INFO, "pge1 message1"));
- logger.log(new PgeLogRecord(PGE_1_NAME, Level.INFO, "pge1 message2"));
- logger.log(new PgeLogRecord(PGE_1_NAME, Level.INFO, "pge1 message3"));
+ logger.log(new PgeLogRecord(instanceId1, Level.INFO, "pge1 message1"));
+ logger.log(new PgeLogRecord(instanceId1, Level.INFO, "pge1 message2"));
+ logger.log(new PgeLogRecord(instanceId1, Level.INFO, "pge1 message3"));
- logger.log(new PgeLogRecord(PGE_2_NAME, Level.INFO, "pge2 message1"));
+ logger.log(new PgeLogRecord(instanceId2, Level.INFO, "pge2 message1"));
handler1.close();
handler2.close();
Added: oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/writers/MockSciPgeConfigFileWriter.java
URL: http://svn.apache.org/viewvc/oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/writers/MockSciPgeConfigFileWriter.java?rev=1302040&view=auto
==============================================================================
--- oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/writers/MockSciPgeConfigFileWriter.java (added)
+++ oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/writers/MockSciPgeConfigFileWriter.java Sun Mar 18 00:01:57 2012
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.pge.writers;
+
+//JDK imports
+import java.io.File;
+import java.io.IOException;
+
+//Apache imports
+import org.apache.commons.io.FileUtils;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
+
+/**
+ * Mock implementation of {@link SciPgeConfigFileWriter}.
+ *
+ * @author bfoster (Brian Foster)
+ */
+public class MockSciPgeConfigFileWriter implements SciPgeConfigFileWriter {
+
+ public File createConfigFile(String sciPgeConfigFilePath,
+ Metadata inputMetadata, Object... customArgs) throws IOException {
+ File configFile = new File(sciPgeConfigFilePath);
+ configFile.getParentFile().mkdirs();
+ FileUtils.touch(configFile);
+ return configFile;
+ }
+}
Propchange: oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/writers/MockSciPgeConfigFileWriter.java
------------------------------------------------------------------------------
svn:mime-type = text/plain